diff --git a/README.md b/README.md index e684de7..e5af1a0 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ This section provides a brief overview of the contained packages & evaluations p see the `CONTAINED PACKAGES` section in the documentation for a more detailed description of the packages, their functionality and usage, as well as the API reference for each package and examples demonstrating the usage of the packages. Evaluation results are also provided for some of the packages (On-Demand Video Decoder, -Batching Helpers, DALI Pipeline Framework). +Batching Helpers, Multi-Tensor Copier, DALI Pipeline Framework). > **ℹ️ Note**: We are planning to add demos for packages contained in ACCV-Lab in the future. Apart from > acting as tutorials show-casing real-world examples, they will include the implementation of the experiments @@ -34,6 +34,10 @@ The contained packages are: training. - **Batching Helpers**: Facilitates easy-to-implement batching for non‑uniform sample sizes, a common issue in loss computation in the ADAS domain. +- **Multi-Tensor Copier**: Efficient copying of tensors in nested structures (lists, tuples, dicts) from + CPU to GPU. Automatically finds all tensors in the structure, asynchronously applies pinned memory staging, + packs small tensors into a single transfer, and performs other optimizations to significantly reduce the + overhead of copying many small, variable-size tensors e.g. typical in ADAS training meta-data. - **DALI Pipeline Framework**: Framework on top of [NVIDIA DALI](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/) that simplifies creation of pipelines for typical ADAS use‑cases and enables integration into existing training implementations with diff --git a/docs/index.rst b/docs/index.rst index 00bcedb..fb0219b 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -63,6 +63,7 @@ Please see the documentation of each namespace package for usage instructions (a contained_package_docs_mirror/on_demand_video_decoder/docs/index contained_package_docs_mirror/batching_helpers/docs/index + contained_package_docs_mirror/multi_tensor_copier/docs/index contained_package_docs_mirror/dali_pipeline_framework/docs/index contained_package_docs_mirror/draw_heatmap/docs/index contained_package_docs_mirror/optim_test_tools/docs/index diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index f360a3c..4d9d253 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -72,6 +72,7 @@ photometric randomization rtype dtype +dtypes uint functor prefetch @@ -185,6 +186,12 @@ literalinclude blockquote distributable posix +destructor +parallelly +dicts +enqueue +enqueued JIT prepend prepended +profiler diff --git a/namespace_packages_config.py b/namespace_packages_config.py index 61c1bae..a94ab18 100644 --- a/namespace_packages_config.py +++ b/namespace_packages_config.py @@ -25,6 +25,7 @@ # 'accvlab.example_skbuild_package', 'accvlab.on_demand_video_decoder', 'accvlab.batching_helpers', + 'accvlab.multi_tensor_copier', 'accvlab.dali_pipeline_framework', 'accvlab.draw_heatmap', 'accvlab.optim_test_tools', diff --git a/packages/batching_helpers/docs/example.rst b/packages/batching_helpers/docs/example.rst index 6929e55..7061887 100644 --- a/packages/batching_helpers/docs/example.rst +++ b/packages/batching_helpers/docs/example.rst @@ -1,5 +1,5 @@ Example -======== +======= Here, we provide an example of how to use the `batching-helpers` package to implement object detection loss, including diff --git a/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_functionality_test.py b/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_functionality_test.py index 0fea22f..1571155 100644 --- a/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_functionality_test.py +++ b/packages/dali_pipeline_framework/tests/pipeline/sample_data_group_functionality_test.py @@ -257,7 +257,7 @@ def test_set_data_from_dali_generic_iterator_output_with_flattened_names(): src["sequence"][0]["value"] = 0.0 src["sequence"][1]["value"] = 1.0 - # Simulate DALIGenericIterator output: a list of dicts keyed by flattened names + # Simulate DALIGenericIterator output: a list of dictionaries keyed by flattened names flattened_names = src.field_names_flat flat_values = src.get_data() iterator_like_output = [{name: value for name, value in zip(flattened_names, flat_values)}] diff --git a/packages/multi_tensor_copier/accvlab/multi_tensor_copier/__init__.py b/packages/multi_tensor_copier/accvlab/multi_tensor_copier/__init__.py new file mode 100644 index 0000000..94c6820 --- /dev/null +++ b/packages/multi_tensor_copier/accvlab/multi_tensor_copier/__init__.py @@ -0,0 +1,20 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .async_copy import AsyncCopyHandle, start_copy + +__all__ = [ + "AsyncCopyHandle", + "start_copy", +] diff --git a/packages/multi_tensor_copier/accvlab/multi_tensor_copier/async_copy.py b/packages/multi_tensor_copier/accvlab/multi_tensor_copier/async_copy.py new file mode 100644 index 0000000..5760dea --- /dev/null +++ b/packages/multi_tensor_copier/accvlab/multi_tensor_copier/async_copy.py @@ -0,0 +1,169 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import torch +import numpy as np + +from . import _ext + + +@dataclass +class AsyncCopyHandle: + """Handle to an in-progress asynchronous copy started by :func:`start_copy`. + + Use :meth:`ready` to poll for completion without blocking, or :meth:`get` to block until + the result is available. The handle must be kept alive until the copy is consumed or goes + out of scope; dropping it early triggers a synchronous wait in the destructor to ensure + staging buffers are not freed while transfers are in flight. + """ + + _h: Any + + def ready(self) -> bool: + """Return whether the copy has completed. + + Returns: + ``True`` if the copy has completed, ``False`` otherwise. + """ + return bool(self._h.ready()) + + def get(self) -> list[Any] | tuple[Any, ...] | dict[Any, Any] | torch.Tensor: + """Block until the copy is done and return the result. + + The returned structure mirrors the input to :func:`start_copy`, with all tensors + copied to the target device. + + Returns: + The structure with the contained tensors copied to the target device (and numpy arrays replaced by + PyTorch tensors). + + Raises: + RuntimeError: If the copy fails + + """ + return self._h.get() + + +def start_copy( + data: list[Any] | tuple[Any, ...] | dict[Any, Any] | torch.Tensor | np.ndarray, + device: str | torch.device, + *, + use_pinned_staging: bool = True, + pack_cpu_tensors: bool = True, + min_packed_alignment_bytes: int = 16, + max_packed_chunk_bytes: int = 32 * 1024 * 1024, + use_background_thread: bool = True, +) -> AsyncCopyHandle: + """Asynchronously copy tensors in a nested structure to ``device``. + + Traverses an arbitrarily nested combination of :class:`list`, :class:`tuple`, and :class:`dict` + containers, copies every :class:`torch.Tensor` and :class:`numpy.ndarray` (automatically converted to + PyTorch tensors) leaf to ``device``, and returns an :class:`AsyncCopyHandle` whose + :meth:`~AsyncCopyHandle.get` method yields the copied structure. The output preserves container types and + passes through non-tensor, non-container leaves (e.g. strings) unchanged. + + The primary optimization target is **CPU → GPU** transfers of many small tensors in non-pinned + memory. Other copy directions (GPU → CPU, GPU → GPU, CPU → CPU) are supported and benefit from + some optimizations (e.g. background-thread scheduling for all directions, parallel pinned staging for + D2H), but are not the main focus. + + .. note:: + + The input tensors do not need to all be on the same device, copying tensors from different devices is + supported. If some tensors are already on the target device, they will be re-used as is. + + .. important:: + + Packing of small tensors (see ``pack_cpu_tensors`` parameter below) is a major contribution to the + overall performance optimization vs. using standard PyTorch ``.to()`` calls on the individual tensors. + For this optimization to be applied, the input CPU tensors must be contiguous. + + .. warning:: + + The caller must not **free** or **modify in-place** any input tensors until the copy has completed + (i.e. until :meth:`~AsyncCopyHandle.get` returns or :meth:`~AsyncCopyHandle.ready` returns ``True``). + Because copies are submitted asynchronously — potentially on a background thread — input tensor memory + may still be read by the GPU after this function returns. Violating this contract leads to undefined + behavior (silent data corruption, stale reads, or CUDA errors). + + Important: + Only :class:`list`, :class:`tuple`, and :class:`dict` are recognized as container types. + Other container-like objects (e.g. custom classes, named tuples) are treated as opaque + leaves and returned unchanged; any tensors nested inside them will **not** be copied. + + Args: + data: The structure to copy. May be a single :class:`torch.Tensor` or + :class:`numpy.ndarray`, or a nested :class:`list`/:class:`tuple`/:class:`dict` objects containing + tensor/numpy arrays. + device: Target PyTorch device (e.g. ``"cuda:0"``, ``"cpu"``). + use_pinned_staging: When ``True``, allocate pinned (page-locked) host buffers as + intermediate staging for CPU → CUDA and CUDA → CPU transfers. For H2D this enables + ``non_blocking`` copies; for D2H the pinned buffer **is** the returned output tensor. + Has no effect on CPU → CPU or GPU → GPU copies. + pack_cpu_tensors: When ``True``, pack multiple small contiguous CPU tensors (≤ 256 KB + each, mixed dtypes supported) into one or more staging buffers (each at most + ``max_packed_chunk_bytes``) and issue one H2D transfer per chunk instead of per tensor. + Only applies to CPU → CUDA copies. + min_packed_alignment_bytes: Minimum byte-alignment of each tensor's start offset within + the packed buffer. The effective alignment for each tensor which participates in the packing is + ``max(min_packed_alignment_bytes, tensor.element_size())``. + max_packed_chunk_bytes: Maximum payload size in bytes of each packed staging chunk (tensor + data plus inter-tensor alignment padding; the actual allocation may be slightly larger to + satisfy buffer-start alignment). When the total packed data exceeds this limit, multiple + packed chunks are allocated and transferred. Defaults to 32 MB. + use_background_thread: When ``True``, the copy orchestration (buffer allocation, staging, + and CUDA copy submission) runs on a C++ background thread (from a shared pool) so that this + function returns before the copies complete. Note that CPU staging is done parallelly regardless + of this setting. Benefits all copy directions. + + Returns: + Handle to the in-progress copy. Call :meth:`~AsyncCopyHandle.get` to block until completion + and retrieve the result, or :meth:`~AsyncCopyHandle.ready` to poll without blocking. + + Raises: + RuntimeError: If the copy fails (propagated on :meth:`~AsyncCopyHandle.get`). + + Examples: + Copy a nested structure of tensors to the GPU:: + + data = [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6])] + handle = start_copy(data, "cuda:0") + # ... do other work ... + result = handle.get() # [tensor([1,2,3], device='cuda:0'), ...] + + Convert numpy arrays to CPU tensors (makes use of the fact that numpy arrays can be used as inputs and + benefits from background-thread scheduling):: + + data = [np.array([1, 2, 3]), np.array([4, 5, 6])] + handle = start_copy(data, "cpu") + result = handle.get() # [tensor([1, 2, 3]), tensor([4, 5, 6])] + """ + dev = torch.device(device) + h = _ext.start_copy( + data, + str(dev), + bool(use_pinned_staging), + bool(use_background_thread), + bool(pack_cpu_tensors), + int(min_packed_alignment_bytes), + int(max_packed_chunk_bytes), + ) + handle = AsyncCopyHandle(h) + return handle diff --git a/packages/multi_tensor_copier/accvlab/multi_tensor_copier/csrc/multi_tensor_copier.cpp b/packages/multi_tensor_copier/accvlab/multi_tensor_copier/csrc/multi_tensor_copier.cpp new file mode 100644 index 0000000..4b8fac7 --- /dev/null +++ b/packages/multi_tensor_copier/accvlab/multi_tensor_copier/csrc/multi_tensor_copier.cpp @@ -0,0 +1,1196 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace py = pybind11; + +namespace { + +enum class NodeKind { + List, + Tuple, + Dict, + TensorLeaf, + Passthrough, +}; + +// Minimal PyTree representation of the input structure. +// +// - Lists/tuples store children in `seq` (and preserve tuple/list kind). +// - Dicts store ordered key/value pairs in `items` (keys preserved as Python objects). +// - Tensor leaves store `tensor_idx` (index into CopyState.inputs/outputs). +// - Passthrough leaves store the original Python object and are returned unchanged in output. +struct Node { + NodeKind kind{NodeKind::Passthrough}; + + std::vector seq; + std::vector> items; + + size_t tensor_idx{0}; + py::object obj; +}; + +// Minimal RAII wrapper around a CUDA event. +// +// Events are used to track completion of all enqueued copies on a CUDA stream, +// so `ready()` can poll and `get()` / destructor can wait before releasing +// staging buffers. +struct CudaEvent { + cudaEvent_t ev{nullptr}; + int device_index{-1}; + + CudaEvent() = default; + CudaEvent(cudaEvent_t e, int dev) : ev(e), device_index(dev) {} + + CudaEvent(const CudaEvent&) = delete; + CudaEvent& operator=(const CudaEvent&) = delete; + + CudaEvent(CudaEvent&& other) noexcept { + ev = other.ev; + device_index = other.device_index; + other.ev = nullptr; + other.device_index = -1; + } + CudaEvent& operator=(CudaEvent&& other) noexcept { + if (this != &other) { + cleanup_no_throw(); + ev = other.ev; + device_index = other.device_index; + other.ev = nullptr; + other.device_index = -1; + } + return *this; + } + + ~CudaEvent() { cleanup_no_throw(); } + + void cleanup_no_throw() noexcept { + if (ev != nullptr) { + // cudaEventDestroy does not require the Python GIL. + cudaEventDestroy(ev); + ev = nullptr; + device_index = -1; + } + } +}; + +struct PyConversionCtx { + py::object numpy_ndarray_type; + py::object numpy_ascontiguousarray; + py::object torch_from_numpy; +}; + +static PyConversionCtx make_py_conversion_ctx_from_cache(const py::dict& cache) { + // Called with GIL held. `cache` is a Python-owned dict stored on the module object. + PyConversionCtx ctx; + ctx.numpy_ndarray_type = cache["numpy_ndarray_type"]; + ctx.numpy_ascontiguousarray = cache["numpy_ascontiguousarray"]; + ctx.torch_from_numpy = cache["torch_from_numpy"]; + return ctx; +} + +static Node make_tensor_leaf_node(const std::vector& inputs) { + Node out; + out.kind = NodeKind::TensorLeaf; + out.tensor_idx = inputs.size() - 1; + return out; +} + +static Node traverse_build_tree_impl(const py::handle& obj, const PyConversionCtx& ctx, + std::vector& inputs); + +static Node traverse_sequence(NodeKind kind, ssize_t n, const std::function& get_item, + const PyConversionCtx& ctx, std::vector& inputs) { + Node out; + out.kind = kind; + out.seq.reserve(static_cast(n)); + for (ssize_t i = 0; i < n; ++i) { + py::handle item = get_item(i); + out.seq.push_back(traverse_build_tree_impl(item, ctx, inputs)); + } + return out; +} + +// Traverse an input Python object and build: +// - a PyTree describing container structure (lists/tuples/dicts), +// - a flat list of tensor leaves (`inputs`) (numpy arrays are converted to tensors), +// - passthrough leaves stored as Python objects and returned unchanged. +// +// Invariant: TensorLeaf nodes store indices aligned between `inputs[i]` and `outputs[i]`. +// +// NOTE: Must be called with the GIL held (touches Python objects). +static Node traverse_build_tree_impl(const py::handle& obj, const PyConversionCtx& ctx, + std::vector& inputs) { + // Containers + if (py::isinstance(obj)) { + auto in_list = py::reinterpret_borrow(obj); + const ssize_t n = static_cast(in_list.size()); + const Node out = traverse_sequence( + NodeKind::List, n, [&](ssize_t i) { return in_list[i]; }, ctx, inputs); + return out; + } + + if (py::isinstance(obj)) { + auto in_tup = py::reinterpret_borrow(obj); + const ssize_t n = static_cast(in_tup.size()); + const Node out = traverse_sequence( + NodeKind::Tuple, n, [&](ssize_t i) { return in_tup[i]; }, ctx, inputs); + return out; + } + + if (py::isinstance(obj)) { + auto in_dict = py::reinterpret_borrow(obj); + Node out; + out.kind = NodeKind::Dict; + out.items.reserve(static_cast(in_dict.size())); + for (auto kv : in_dict) { + py::object k = py::reinterpret_borrow(kv.first); + py::handle v = kv.second; + out.items.emplace_back(std::move(k), traverse_build_tree_impl(v, ctx, inputs)); + } + return out; + } + + // Tensor leaves + if (py::isinstance(obj, ctx.numpy_ndarray_type)) { + py::object tensor_obj; + try { + tensor_obj = ctx.torch_from_numpy(obj); // usually zero-copy + } catch (const py::error_already_set&) { + // If direct conversion fails (e.g., unusual strides), make a contiguous copy and retry. + py::object contiguous = ctx.numpy_ascontiguousarray(obj); + tensor_obj = ctx.torch_from_numpy(contiguous); + } + inputs.push_back(py::cast(tensor_obj)); + const Node out = make_tensor_leaf_node(inputs); + return out; + } + + try { + inputs.push_back(py::cast(obj)); + const Node out = make_tensor_leaf_node(inputs); + return out; + } catch (const py::cast_error&) { + // Passthrough: return the original object unchanged. + Node out; + out.kind = NodeKind::Passthrough; + out.obj = py::reinterpret_borrow(obj); + return out; + } +} + +// Parse a device string (e.g. "cpu", "cuda:0") into a c10::Device and convert errors +// into a user-friendly runtime_error for Python. +static c10::Device parse_device(const std::string& device_str) { + try { + c10::Device dev(device_str); + return dev; + } catch (const c10::Error& /*e*/) { + throw std::runtime_error(std::string("Invalid device string: '") + device_str + "'"); + } +} + +} // namespace + +namespace { + +struct CopyState { + // Flattened tensor leaves extracted during input traversal (GIL-held). + std::vector inputs; + // Outputs aligned with `inputs` (same indexing). + std::vector outputs; + // Optional per-tensor pinned staging buffers (only used when pinning is enabled + // and the tensor is not part of the packed buffer). + std::vector pinned_buffers; + // Per-chunk packed CPU staging. Each chunk has an over-sized allocation (full) and an + // aligned narrow view (the actual buffer used for memcpy / H2D). + std::vector packed_cpu_chunks_full; + std::vector packed_cpu_chunks; + // Completion events for CUDA submission. + std::vector events; + + c10::Device target_device{c10::kCPU}; + bool use_pinned_staging{true}; + bool pack_cpu_tensors{true}; + // Minimum alignment (bytes) for each tensor start within the packed buffer. + // Effective per-tensor alignment is max(min_packed_alignment_bytes, element_size()). + int64_t min_packed_alignment_bytes{16}; + // Maximum bytes per packed chunk. When the total packed data exceeds this, multiple + // chunks are allocated, each transferred with its own H2D copy. + int64_t max_packed_chunk_bytes{32 * 1024 * 1024}; + + // CUDA streams captured at call time (on the user's thread) so that work enqueued by + // the copier is correctly ordered with respect to the user's preceding GPU operations. + // synchronize_source_streams() uses these to establish all necessary cross-stream + // dependencies before any per-tensor copies are enqueued. + std::optional target_stream; // target CUDA device (H2D / D2D) + bool target_stream_used{false}; + std::unordered_map src_streams; // per source CUDA device + // Events recorded at capture time on each source stream. For D2D, + // synchronize_source_streams makes the target stream wait on these, pinning the + // sync point to the moment start_copy was called (not whenever the background thread + // happens to run). Cleaned up automatically via CudaEvent RAII. + std::unordered_map src_capture_events; + // Source device indices whose streams received copy work (populated by + // synchronize_source_streams; used for completion-event recording). + std::vector src_streams_used; + + // If scheduling fails in the background task, the exception is captured here + // and rethrown in `ready()` / `get()`. + std::exception_ptr exc; + + // Completion signal for background submission. + // `done` becomes ready once staging + copy submission has finished (or failed). + std::shared_future done; +}; + +class CopyThreadPool { + public: + static CopyThreadPool& instance() { + static CopyThreadPool pool; + return pool; + } + + // Enqueue a background task. Tasks MUST NOT call Python APIs (no GIL); only ATen/CUDA code is allowed. + void enqueue(std::function fn) { + { + std::lock_guard lk(mutex_); + tasks_.push_back(std::move(fn)); + } + cond_var_.notify_one(); + } + + private: + // Create a small, process-wide worker pool to overlap CPU staging/CUDA submission with Python work. + CopyThreadPool() { + const unsigned hardware_conc = std::thread::hardware_concurrency(); + const unsigned n_workers = std::max(1u, std::min(4u, (hardware_conc == 0u ? 4u : hardware_conc))); + workers_.reserve(n_workers); + for (unsigned i = 0; i < n_workers; ++i) { + workers_.emplace_back([this]() { this->worker_loop(); }); + } + } + + ~CopyThreadPool() { + { + std::lock_guard lk(mutex_); + stop_ = true; + } + cond_var_.notify_all(); + for (auto& t : workers_) { + if (t.joinable()) { + t.join(); + } + } + } + + void worker_loop() { + while (true) { + std::function fn; + { + std::unique_lock lock(mutex_); + cond_var_.wait(lock, [&]() { return stop_ || !tasks_.empty(); }); + if (stop_ && tasks_.empty()) { + return; + } + fn = std::move(tasks_.front()); + tasks_.pop_front(); + } + fn(); + } + } + + std::mutex mutex_; + std::condition_variable cond_var_; + std::deque> tasks_; + std::vector workers_; + bool stop_{false}; +}; + +// A precomputed plan for the optional "pack many small CPU tensors into one staging buffer" fast path. +// When enabled, multiple small contiguous CPU tensors (mix of different dtypes allowed) are copied into a +// single packed *byte* buffer (pinned or pageable), transferred with a single H2D, and then reconstructed as +// per-tensor views sharing the packed GPU storage. +// +// For each input i: byte_offset_by_input[i] is the starting *byte* offset inside the packed buffer, +// or -1 if this input is not packed. +struct PackPlan { + // For each input leaf i: starting byte offset inside its chunk, or -1 if not packed. + // IMPORTANT: This is checked for all tensors, not only the packed ones, and not only if packing is + // enabled. Therefore, it has to be initialized with -1 for all non-packed inputs. + std::vector byte_offset_by_input; // -1 => not packed + // For each input leaf i: which chunk it belongs to, or -1 if not packed. + std::vector chunk_index_by_input; // -1 => not packed + // Byte size of each chunk (one entry per chunk). + std::vector chunk_sizes; + // Whether packing is enabled for this call (if false, treat everything as "not packed"). + bool enabled{false}; +}; + +static inline int64_t round_up_i64(int64_t x, int64_t a) { + if (a <= 1) { + return x; + } + const int64_t rem = x % a; + const int64_t res = rem == 0 ? x : (x + (a - rem)); + return res; +} + +static inline int64_t next_pow2_i64(int64_t x) { + if (x <= 1) { + return 1; + } + // Round up to the next power of two (clamped to int64 range). + uint64_t v = static_cast(x - 1); + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + v += 1; + if (v > static_cast(std::numeric_limits::max())) { + return std::numeric_limits::max(); + } + return static_cast(v); +} + +static inline int64_t packed_buffer_alignment_bytes(const CopyState& cs) { + // Ensure the packed buffer itself is aligned to at least this many bytes. + // Also round up to a power-of-two so we can use bit-masking for pointer alignment. + const int64_t requested = std::max(16, cs.min_packed_alignment_bytes); + return next_pow2_i64(requested); +} + +static inline int64_t aligned_slice_offset_bytes(const void* base_ptr, int64_t alignment_pow2) { + if (alignment_pow2 <= 1) { + return 0; + } + const uintptr_t base = reinterpret_cast(base_ptr); + const uintptr_t a = static_cast(alignment_pow2); + const uintptr_t aligned = (base + (a - 1)) & ~(a - 1); + int64_t off = static_cast(aligned - base); + return off; +} + +// Bucket ordering key: clamp a required alignment to {16,8,4,2,1} by rounding down to the nearest bucket <= 16. +// IMPORTANT: This is only used for ordering; the actual per-tensor alignment is preserved separately. +static inline int64_t pack_bucket_key(int64_t required_align) { + if (required_align >= 16) { + return 16; + } + if (required_align >= 8) { + return 8; + } + if (required_align >= 4) { + return 4; + } + if (required_align >= 2) { + return 2; + } + return 1; +} + +// Packing candidate: an input tensor i that can be packed into the CPU staging byte buffer. +struct PackCandidate { + size_t idx; + int64_t bytes; + int64_t required_align; +}; + +// Buckets of packing candidates in descending alignment order. +// Note: complex128 element_size() is 16, complex64 is 8. +struct PackBuckets { + std::vector a16; + std::vector a8; + std::vector a4; + std::vector a2; + std::vector a1; + + void add(PackCandidate c) { + switch (pack_bucket_key(c.required_align)) { + case 16: + a16.push_back(c); + break; + case 8: + a8.push_back(c); + break; + case 4: + a4.push_back(c); + break; + case 2: + a2.push_back(c); + break; + default: + a1.push_back(c); + break; + } + } + + template + void for_each_bucket_desc(F&& f) const { + f(16, a16); + f(8, a8); + f(4, a4); + f(2, a2); + f(1, a1); + } +}; + +static std::optional make_pack_candidate(const CopyState& copy_state, size_t i, + int64_t min_align) { + // Heuristic thresholds: only pack "small" tensors. + constexpr int64_t kPackMaxBytesPerTensor = 256 * 1024; // 256KB + + const auto& in = copy_state.inputs[i]; + // Only consider CPU tensors that will be transferred to CUDA. + if (!in.device().is_cpu() || in.device() == copy_state.target_device) { + return std::nullopt; + } + // Packing requires a flat contiguous view. + if (!in.is_contiguous()) { + return std::nullopt; + } + const int64_t bytes = in.numel() * in.element_size(); + // Skip tensors that are too big; packing targets "many tiny tensors" overhead. + if (bytes == 0 || bytes > kPackMaxBytesPerTensor) { + return std::nullopt; + } + const int64_t elem_sz = static_cast(in.element_size()); + // Effective alignment must be >= requested minimum AND must guarantee element alignment. + // If min_align is not a multiple of elem_sz, round up to the next multiple to preserve + // the invariant that byte_offset % elem_sz == 0. + int64_t required_align = std::max(min_align, elem_sz); + required_align = round_up_i64(required_align, elem_sz); + return PackCandidate{i, bytes, required_align}; +} + +// Assign byte offsets within chunked packed buffers for each candidate tensor, processing +// alignment buckets in descending order to minimise inter-tensor padding. When a tensor +// would exceed `max_chunk_bytes` in the current chunk, a new chunk is started. Populates +// `pack_plan.byte_offset_by_input`, `chunk_index_by_input`, and `chunk_sizes`. +static void layout_packed_offsets(const PackBuckets& buckets, PackPlan& pack_plan, int64_t& packed_count, + int64_t max_chunk_bytes) { + int64_t cursor = 0; + int64_t chunk_idx = 0; + packed_count = 0; + + auto finalize_chunk = [&]() { + if (cursor > 0) { + pack_plan.chunk_sizes.push_back(cursor); + cursor = 0; + ++chunk_idx; + } + }; + + auto pack_bucket = [&](int64_t bucket_align, const std::vector& bucket) { + if (bucket.empty()) { + return; + } + for (const auto& c : bucket) { + int64_t aligned_cursor = round_up_i64(cursor, c.required_align); + if (aligned_cursor + c.bytes > max_chunk_bytes && cursor > 0) { + finalize_chunk(); + aligned_cursor = round_up_i64(cursor, c.required_align); + } + cursor = aligned_cursor; + pack_plan.byte_offset_by_input[c.idx] = cursor; + pack_plan.chunk_index_by_input[c.idx] = chunk_idx; + cursor += c.bytes; + packed_count += 1; + } + }; + + buckets.for_each_bucket_desc(pack_bucket); + if (cursor > 0) { + pack_plan.chunk_sizes.push_back(cursor); + } +} + +// Decide whether to enable the packed-CPU-tensors fast path and, if enabled, compute +// per-tensor chunk assignments and byte offsets within each chunk. +static PackPlan compute_pack_plan(const CopyState& copy_state) { + PackPlan pack_plan; + const size_t n = copy_state.inputs.size(); + pack_plan.byte_offset_by_input.assign(n, -1); + pack_plan.chunk_index_by_input.assign(n, -1); + + if (!copy_state.pack_cpu_tensors || !copy_state.target_device.is_cuda()) { + return pack_plan; + } + + // We only pack tensors that: + // - are on CPU (and not already on the target device), + // - are contiguous (so we can treat them as a flat buffer), + // - are "small enough" individually, + // + // Mixed-dtype packing: we pack raw bytes and reconstruct typed tensors as views + // sharing the packed GPU storage. + const int64_t min_align = std::max(1, copy_state.min_packed_alignment_bytes); + PackBuckets buckets; + for (size_t i = 0; i < n; ++i) { + if (auto cand = make_pack_candidate(copy_state, i, min_align)) { + buckets.add(*cand); + } + } + + int64_t packed_count = 0; + layout_packed_offsets(buckets, pack_plan, packed_count, copy_state.max_packed_chunk_bytes); + + if (packed_count >= 2 && !pack_plan.chunk_sizes.empty()) { + pack_plan.enabled = true; + } else { + pack_plan.enabled = false; + pack_plan.chunk_sizes.clear(); + std::fill(pack_plan.byte_offset_by_input.begin(), pack_plan.byte_offset_by_input.end(), -1); + std::fill(pack_plan.chunk_index_by_input.begin(), pack_plan.chunk_index_by_input.end(), -1); + } + return pack_plan; +} + +// Allocate staging buffers required by the chosen plans: +// - packed CPU buffer (pinned or pageable) if packing is enabled +// - per-tensor pinned buffers for remaining CPU tensors if pinning is enabled +// +// This function only allocates; it does not perform data copies. +static void allocate_staging_buffers(CopyState& copy_state, const PackPlan& pack_plan) { + // Per-chunk packed CPU buffers (pinned or pageable). + if (pack_plan.enabled) { + auto opts = at::TensorOptions().dtype(at::kByte).device(c10::kCPU).pinned_memory( + copy_state.use_pinned_staging); + const int64_t alignment = packed_buffer_alignment_bytes(copy_state); + copy_state.packed_cpu_chunks_full.reserve(pack_plan.chunk_sizes.size()); + copy_state.packed_cpu_chunks.reserve(pack_plan.chunk_sizes.size()); + for (int64_t chunk_bytes : pack_plan.chunk_sizes) { + const int64_t total_alloc = chunk_bytes + alignment - 1; + auto full = at::empty({total_alloc}, opts); + const int64_t off = aligned_slice_offset_bytes(full.data_ptr(), alignment); + copy_state.packed_cpu_chunks.push_back(full.narrow(0, off, chunk_bytes)); + copy_state.packed_cpu_chunks_full.push_back(std::move(full)); + } + } + + // Per-tensor pinned buffers for CPU tensors not covered by packing (CPU->CUDA). + if (copy_state.use_pinned_staging && copy_state.target_device.is_cuda()) { + for (size_t i = 0; i < copy_state.inputs.size(); ++i) { + const auto& in = copy_state.inputs[i]; + if (!in.device().is_cpu() || in.device() == copy_state.target_device) { + continue; + } + if (pack_plan.byte_offset_by_input[i] >= 0) { + continue; + } + auto opts = in.options().device(c10::kCPU).pinned_memory(true); + copy_state.pinned_buffers[i] = at::empty(in.sizes(), opts); + } + } + + // Per-tensor pinned buffers for CUDA tensors when target is CPU (CUDA->CPU / D2H). + // Pinned host memory enables non_blocking D2H copies on a CUDA stream. + if (copy_state.use_pinned_staging && !copy_state.target_device.is_cuda()) { + for (size_t i = 0; i < copy_state.inputs.size(); ++i) { + const auto& in = copy_state.inputs[i]; + if (!in.device().is_cuda()) { + continue; + } + auto opts = in.options().device(c10::kCPU).pinned_memory(true); + copy_state.pinned_buffers[i] = at::empty(in.sizes(), opts); + } + } +} + +// Fill CPU staging buffers (packed buffer slices and/or per-tensor pinned buffers). +// +// This is the CPU-heavy part and is parallelized via at::parallel_for. +// It does NOT enqueue CUDA transfers; it only prepares source buffers. +static void fill_cpu_staging_buffers(CopyState& copy_state, const PackPlan& pack_plan) { + if (!copy_state.target_device.is_cuda()) { + return; + } + + auto stage_one = [&](size_t i) -> void { + const auto& in = copy_state.inputs[i]; + if (!in.device().is_cpu() || in.device() == copy_state.target_device) { + return; + } + const int64_t off = pack_plan.byte_offset_by_input[i]; + if (off >= 0) { + const int64_t chunk_idx = pack_plan.chunk_index_by_input[i]; + const int64_t bytes = in.numel() * in.element_size(); + auto* dst = static_cast( + copy_state.packed_cpu_chunks[static_cast(chunk_idx)].data_ptr()) + + off; + const auto* src = static_cast(in.data_ptr()); + std::memcpy(dst, src, static_cast(bytes)); + return; + } + auto& pinned = copy_state.pinned_buffers[i]; + if (pinned.defined()) { + pinned.copy_(in, /*non_blocking=*/false); + } + }; + + at::parallel_for(0, static_cast(copy_state.inputs.size()), 1, [&](int64_t begin, int64_t end) { + for (int64_t ii = begin; ii < end; ++ii) { + stage_one(static_cast(ii)); + } + }); +} + +// Enqueue packed CPU->CUDA transfers (one H2D per chunk) and populate copy_state.outputs[i] +// with GPU views/slices into the corresponding chunk's GPU buffer. +static void enqueue_packed_transfer(CopyState& copy_state, const PackPlan& pack_plan) { + if (!copy_state.target_device.is_cuda() || copy_state.packed_cpu_chunks.empty() || + !copy_state.target_stream.has_value()) { + return; + } + const auto stream = *copy_state.target_stream; + c10::cuda::CUDAGuard guard(stream.device_index()); + at::cuda::CUDAStreamGuard stream_guard(stream); + copy_state.target_stream_used = true; + + auto gpu_opts = at::TensorOptions().dtype(at::kByte).device(copy_state.target_device); + const int64_t alignment = packed_buffer_alignment_bytes(copy_state); + + // Allocate GPU buffers and enqueue H2D copies for each chunk. + const size_t num_chunks = pack_plan.chunk_sizes.size(); + std::vector gpu_chunks(num_chunks); + std::vector gpu_base_offsets(num_chunks); + for (size_t c = 0; c < num_chunks; ++c) { + const int64_t chunk_bytes = pack_plan.chunk_sizes[c]; + const int64_t total_alloc = chunk_bytes + alignment - 1; + auto gpu_full = at::empty({total_alloc}, gpu_opts); + const int64_t base_off = aligned_slice_offset_bytes(gpu_full.data_ptr(), alignment); + gpu_chunks[c] = gpu_full.narrow(0, base_off, chunk_bytes); + gpu_base_offsets[c] = base_off; + gpu_chunks[c].copy_(copy_state.packed_cpu_chunks[c], + /*non_blocking=*/copy_state.use_pinned_staging); + } + + // Create per-tensor output views referencing the correct chunk's GPU storage. + for (size_t i = 0; i < copy_state.inputs.size(); ++i) { + const int64_t off = pack_plan.byte_offset_by_input[i]; + if (off < 0) { + continue; + } + const auto chunk_idx = static_cast(pack_plan.chunk_index_by_input[i]); + const int64_t base_off = gpu_base_offsets[chunk_idx]; + const auto& in = copy_state.inputs[i]; + const int64_t elem_sz = static_cast(in.element_size()); + if (elem_sz <= 0 || ((base_off + off) % elem_sz) != 0) { + throw std::runtime_error( + "Packed buffer alignment invariant violated (byte offset not divisible by element size)."); + } + const int64_t storage_off_elems = (base_off + off) / elem_sz; + auto out = at::empty({0}, in.options().device(copy_state.target_device)); + out.set_(gpu_chunks[chunk_idx].storage(), storage_off_elems, in.sizes(), in.strides()); + copy_state.outputs[i] = out; + } +} + +// Single synchronization point for all captured source CUDA streams, regardless of copy +// direction. Called once before any per-tensor copies are enqueued. +// +// - D2H (CPU target, CUDA sources): copies will run on the source streams, so ordering with +// prior GPU work is implicit. We mark these streams as used for completion-event tracking. +// - D2D (CUDA target, source on a different device): copies run on the target stream, so we +// make it wait on the capture-time event recorded in start_copy_impl. This pins the sync +// point to the moment start_copy was called, regardless of background-thread timing. +// - H2D / CPU→CPU: no source CUDA streams; nothing to do. +static void synchronize_source_streams(CopyState& copy_state) { + for (const auto& [src_dev_idx, src_stream] : copy_state.src_streams) { + if (!copy_state.target_device.is_cuda()) { + // D2H: work runs on the source stream — implicit ordering with prior GPU work. + copy_state.src_streams_used.push_back(src_dev_idx); + continue; + } + if (src_dev_idx == static_cast(copy_state.target_stream->device_index())) { + continue; // same device as target — tensors reused as-is + } + // D2D: target stream waits on the capture-time event from the source stream. + auto ev_it = copy_state.src_capture_events.find(src_dev_idx); + if (ev_it == copy_state.src_capture_events.end() || ev_it->second.ev == nullptr) { + throw std::runtime_error("Internal error: no capture event for source CUDA device " + + std::to_string(src_dev_idx)); + } + auto st = cudaStreamWaitEvent(copy_state.target_stream->stream(), ev_it->second.ev, 0); + if (st != cudaSuccess) { + throw std::runtime_error(std::string("cudaStreamWaitEvent failed: ") + cudaGetErrorString(st)); + } + } +} + +// Enqueue per-tensor transfers for all leaves not handled by packing. +// synchronize_source_streams() must be called before this function. +// +// Behavior summary: +// - tensors already on target device are reused +// - CUDA->CPU (D2H): copies run on the captured source-device stream. With pinned staging, +// an async D2H into the pinned buffer is used; otherwise a synchronous `.to()`. +// - CPU->CPU: synchronous `.to()` +// - *->CUDA: copies run on the captured target stream. D2D is safe because +// synchronize_source_streams inserted the necessary cross-device event waits. +// Pinned staging enables non_blocking H2D. +static void enqueue_per_tensor_transfers(CopyState& copy_state, const PackPlan& pack_plan) { + for (size_t i = 0; i < copy_state.inputs.size(); ++i) { + const auto& in = copy_state.inputs[i]; + + if (pack_plan.byte_offset_by_input[i] >= 0) { + continue; // handled by packed transfer + } + + if (in.device() == copy_state.target_device) { + copy_state.outputs[i] = in; + continue; + } + + // --- non-CUDA target (D2H / CPU→CPU) --- + if (!copy_state.target_device.is_cuda()) { + if (in.device().is_cuda()) { + const auto src_dev_idx = static_cast(in.device().index()); + const auto& stream = copy_state.src_streams.at(src_dev_idx); + c10::cuda::CUDAGuard guard(static_cast(src_dev_idx)); + at::cuda::CUDAStreamGuard stream_guard(stream); + if (copy_state.use_pinned_staging && copy_state.pinned_buffers[i].defined()) { + copy_state.pinned_buffers[i].copy_(in, /*non_blocking=*/true); + copy_state.outputs[i] = copy_state.pinned_buffers[i]; + } else { + copy_state.outputs[i] = in.to(copy_state.target_device); + } + } else { + copy_state.outputs[i] = in.to(copy_state.target_device); + } + continue; + } + + // --- CUDA target (H2D / D2D) --- + const auto target_stream = *copy_state.target_stream; + c10::cuda::CUDAGuard guard(target_stream.device_index()); + at::cuda::CUDAStreamGuard stream_guard(target_stream); + copy_state.target_stream_used = true; + + copy_state.outputs[i] = at::empty(in.sizes(), in.options().device(copy_state.target_device)); + if (copy_state.use_pinned_staging && in.device().is_cpu() && copy_state.pinned_buffers[i].defined()) { + copy_state.outputs[i].copy_(copy_state.pinned_buffers[i], /*non_blocking=*/true); + } else { + copy_state.outputs[i].copy_(in, /*non_blocking=*/in.device().is_cuda()); + } + } +} + +static void record_event_on_stream(CopyState& copy_state, at::cuda::CUDAStream stream, int dev_idx) { + c10::cuda::CUDAGuard guard(static_cast(dev_idx)); + cudaEvent_t ev = nullptr; + const auto st_create = cudaEventCreateWithFlags(&ev, cudaEventDisableTiming); + if (st_create != cudaSuccess) { + throw std::runtime_error(std::string("cudaEventCreateWithFlags failed: ") + + cudaGetErrorString(st_create)); + } + const auto st_rec = cudaEventRecord(ev, stream.stream()); + if (st_rec != cudaSuccess) { + cudaEventDestroy(ev); + throw std::runtime_error(std::string("cudaEventRecord failed: ") + cudaGetErrorString(st_rec)); + } + copy_state.events.emplace_back(ev, dev_idx); +} + +// Record completion events on all streams that received work, so that +// `ready()`/`get()`/destructor can wait for all enqueued copies to complete. +// src_streams_used is populated by synchronize_source_streams (unique keys). +static void record_completion_events(CopyState& copy_state) { + if (copy_state.target_stream_used && copy_state.target_stream.has_value()) { + const auto stream = *copy_state.target_stream; + record_event_on_stream(copy_state, stream, static_cast(stream.device_index())); + } + for (int dev_idx : copy_state.src_streams_used) { + record_event_on_stream(copy_state, copy_state.src_streams.at(dev_idx), dev_idx); + } +} + +// Orchestrate the full copy scheduling: +// - compute packing plan +// - allocate + fill CPU staging buffers +// - enqueue packed transfer (optional) +// - synchronize all source CUDA streams (single sync point for D2H / D2D) +// - enqueue remaining per-tensor transfers +// - record completion events on streams that received work +// +// Uses the CUDA streams captured at call time (stored in CopyState) to ensure correct +// ordering with respect to the user's preceding GPU operations. +// +// Called without the GIL (either on a background pool thread or under gil_scoped_release). +static void schedule_copies(CopyState& copy_state) { + copy_state.outputs.assign(copy_state.inputs.size(), at::Tensor()); + copy_state.pinned_buffers.assign(copy_state.inputs.size(), at::Tensor()); + copy_state.packed_cpu_chunks_full.clear(); + copy_state.packed_cpu_chunks.clear(); + copy_state.events.clear(); + copy_state.target_stream_used = false; + copy_state.src_streams_used.clear(); + + PackPlan pack_plan = compute_pack_plan(copy_state); + + allocate_staging_buffers(copy_state, pack_plan); + fill_cpu_staging_buffers(copy_state, pack_plan); + + if (pack_plan.enabled) { + enqueue_packed_transfer(copy_state, pack_plan); + } + synchronize_source_streams(copy_state); + enqueue_per_tensor_transfers(copy_state, pack_plan); + record_completion_events(copy_state); +} + +template +static void for_each_cuda_event(const std::vector& events, F&& action) { + for (const auto& e : events) { + if (e.ev == nullptr) { + continue; + } + c10::cuda::CUDAGuard guard(static_cast(e.device_index)); + action(e.ev); + } +} + +template +static bool for_each_cuda_event_while_action_succeeds(const std::vector& events, F&& action) { + for (const auto& e : events) { + if (e.ev == nullptr) { + continue; + } + c10::cuda::CUDAGuard guard(static_cast(e.device_index)); + if (!action(e.ev)) { + return false; + } + } + return true; +} + +} // namespace + +// Python-facing handle representing an in-flight copy. +// +// Owns: +// - a PyTree describing the output structure, +// - a shared CopyState containing inputs/staging/outputs/events and a completion future. +// +// Semantics: +// - `ready()` is non-blocking (polls completion future + CUDA events) +// - `get()` blocks until submission finishes and CUDA work completes, then reconstructs the output +// - destructor is conservative: waits for completion and best-effort syncs CUDA to keep lifetime safe +class AsyncCopyHandle { + public: + AsyncCopyHandle(Node root, std::shared_ptr copy_state) + : root_(std::move(root)), copy_state_(std::move(copy_state)) {} + + AsyncCopyHandle(const AsyncCopyHandle&) = delete; + AsyncCopyHandle& operator=(const AsyncCopyHandle&) = delete; + AsyncCopyHandle(AsyncCopyHandle&&) = default; + AsyncCopyHandle& operator=(AsyncCopyHandle&&) = default; + + ~AsyncCopyHandle() { + // Best-effort cleanup; must not throw. + cleanup_no_throw(); + + // Ensure Python refcounts are decremented with the GIL held. + if (Py_IsInitialized()) { + py::gil_scoped_acquire gil; + // Drop any Python references held by the PyTree. + root_ = Node(); + } + // tensors + cuda events clean up without requiring GIL. + } + + bool ready() const { + if (!copy_state_ || !copy_state_->done.valid()) { + return false; + } + if (copy_state_->done.wait_for(std::chrono::seconds(0)) != std::future_status::ready) { + return false; + } + rethrow_if_async_failed(*copy_state_); + const bool out = cuda_events_ready(copy_state_->events); + return out; + } + + py::object get() { + { + py::gil_scoped_release nogil; + wait_for_submission(*copy_state_); + rethrow_if_async_failed(*copy_state_); + cuda_events_sync_or_throw(copy_state_->events); + } + + return build_output(root_, copy_state_->outputs); + } + + private: + template + static py::object build_output_sequence(const Node& n, const std::vector& outputs) { + PySeq out(static_cast(n.seq.size())); + for (size_t i = 0; i < n.seq.size(); ++i) { + out[static_cast(i)] = build_output(n.seq[i], outputs); + } + return out; + } + + static py::object build_output(const Node& n, const std::vector& outputs) { + switch (n.kind) { + case NodeKind::List: { + py::object out = build_output_sequence(n, outputs); + return out; + } + case NodeKind::Tuple: { + py::object out = build_output_sequence(n, outputs); + return out; + } + case NodeKind::Dict: { + py::dict out; + for (const auto& kv : n.items) { + out[kv.first] = build_output(kv.second, outputs); + } + return out; + } + case NodeKind::TensorLeaf: { + if (n.tensor_idx >= outputs.size()) { + throw std::runtime_error("Internal error: TensorLeaf index out of range."); + } + return py::cast(outputs[n.tensor_idx]); + } + case NodeKind::Passthrough: + default: + return n.obj; + } + } + + static void wait_for_submission(const CopyState& cs) { + if (cs.done.valid()) { + cs.done.wait(); + } + } + + static void rethrow_if_async_failed(const CopyState& cs) { + if (cs.exc) { + std::rethrow_exception(cs.exc); + } + } + + static bool cuda_events_ready(const std::vector& events) { + const bool out = for_each_cuda_event_while_action_succeeds(events, [](cudaEvent_t ev) { + const auto st = cudaEventQuery(ev); + if (st == cudaErrorNotReady) { + cudaGetLastError(); // clear sticky error + return false; + } + if (st != cudaSuccess) { + throw std::runtime_error(std::string("cudaEventQuery failed: ") + cudaGetErrorString(st)); + } + return true; + }); + return out; + } + + static void cuda_events_sync_or_throw(const std::vector& events) { + for_each_cuda_event(events, [](cudaEvent_t ev) { + const auto st = cudaEventSynchronize(ev); + if (st != cudaSuccess) { + throw std::runtime_error(std::string("cudaEventSynchronize failed: ") + + cudaGetErrorString(st)); + } + }); + } + + void cleanup_no_throw() noexcept { + if (!copy_state_) { + return; + } + try { + wait_for_submission(*copy_state_); + } catch (...) { + // swallow + } + try { + for_each_cuda_event(copy_state_->events, [](cudaEvent_t ev) { + (void)cudaEventSynchronize(ev); + (void)cudaGetLastError(); + }); + } catch (...) { + // swallow + } + } + + Node root_; + std::shared_ptr copy_state_; +}; + +// Core implementation behind the pybind wrapper. +// +// Responsibilities: +// - traverse Python structure under the GIL using the provided conversion context (numpy/torch cache) +// - create CopyState and store user flags +// - either schedule immediately without the GIL, or enqueue scheduling to the worker pool +// - return an AsyncCopyHandle that can be waited via ready()/get() +static AsyncCopyHandle start_copy_impl(py::object data, const std::string& device, bool use_pinned_staging, + bool use_background_thread, bool pack_cpu_tensors, + int64_t min_packed_alignment_bytes, int64_t max_packed_chunk_bytes, + const PyConversionCtx& ctx) { + auto copy_state = std::make_shared(); + Node root = traverse_build_tree_impl(data, ctx, copy_state->inputs); + copy_state->target_device = parse_device(device); + copy_state->use_pinned_staging = use_pinned_staging; + copy_state->pack_cpu_tensors = pack_cpu_tensors; + copy_state->min_packed_alignment_bytes = std::max(1, min_packed_alignment_bytes); + copy_state->max_packed_chunk_bytes = std::max(1, max_packed_chunk_bytes); + + // Capture the user's current CUDA streams while still on the caller's thread. + // These are used later (potentially on a background thread) so that copy work is + // correctly ordered with the user's preceding GPU operations. + if (copy_state->target_device.is_cuda()) { + const auto dev_idx = (copy_state->target_device.index() >= 0) + ? copy_state->target_device.index() + : static_cast(at::cuda::current_device()); + copy_state->target_stream = at::cuda::getCurrentCUDAStream(dev_idx); + } + // For each unique source CUDA device, capture the current stream. When the target is + // also CUDA (D2D), record an event to pin the cross-device sync point to this moment. + const bool need_capture_events = copy_state->target_device.is_cuda(); + for (const auto& t : copy_state->inputs) { + if (t.device().is_cuda()) { + const int dev = static_cast(t.device().index()); + if (copy_state->src_streams.find(dev) == copy_state->src_streams.end()) { + const auto stream = at::cuda::getCurrentCUDAStream(static_cast(dev)); + copy_state->src_streams.emplace(dev, stream); + + if (need_capture_events) { + c10::cuda::CUDAGuard guard(static_cast(dev)); + cudaEvent_t ev = nullptr; + auto st = cudaEventCreateWithFlags(&ev, cudaEventDisableTiming); + if (st != cudaSuccess) { + throw std::runtime_error(std::string("cudaEventCreateWithFlags failed: ") + + cudaGetErrorString(st)); + } + st = cudaEventRecord(ev, stream.stream()); + if (st != cudaSuccess) { + cudaEventDestroy(ev); + throw std::runtime_error(std::string("cudaEventRecord failed: ") + + cudaGetErrorString(st)); + } + copy_state->src_capture_events.try_emplace(dev, ev, dev); + } + } + } + } + + if (use_background_thread) { + // Submit staging + copy enqueue work to the shared pool (no Python API usage). + auto promise = std::make_shared>(); + copy_state->done = promise->get_future().share(); + CopyThreadPool::instance().enqueue([copy_state, promise]() { + try { + schedule_copies(*copy_state); + } catch (...) { + copy_state->exc = std::current_exception(); + } + try { + promise->set_value(); + } catch (...) { + // swallow + } + }); + } else { + py::gil_scoped_release nogil; + try { + schedule_copies(*copy_state); + } catch (...) { + copy_state->exc = std::current_exception(); + } + auto promise = std::make_shared>(); + copy_state->done = promise->get_future().share(); + promise->set_value(); + if (copy_state->exc) { + std::rethrow_exception(copy_state->exc); + } + } + + AsyncCopyHandle out(std::move(root), std::move(copy_state)); + return out; +} + +PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { + // Python-owned cache of expensive-to-look-up objects used during traversal. + // Storing this on the module keeps lifetime tied to Python interpreter shutdown (no C++ static py::object issues). + py::dict py_cache; + py::object numpy = py::module_::import("numpy"); + py::object torch = py::module_::import("torch"); + py_cache["numpy_ndarray_type"] = numpy.attr("ndarray"); + py_cache["numpy_ascontiguousarray"] = numpy.attr("ascontiguousarray"); + py_cache["torch_from_numpy"] = torch.attr("from_numpy"); + m.attr("_py_cache") = py_cache; + + py::class_(m, "AsyncCopyHandle") + .def("get", &AsyncCopyHandle::get, "Wait for transfers (if any) and return the copied structure.") + .def("ready", &AsyncCopyHandle::ready, "Return True if all enqueued async copies have completed."); + + // Pybind entrypoint wrapper. + // + // Responsibilities: + // - traverse Python structure under the GIL (build skeleton + collect inputs + setters), + // converting numpy.ndarray leaves to tensors using the module-owned cache + // - create CopyState and store user flags + // - either schedule immediately without the GIL, or enqueue scheduling to the worker pool + // - return an AsyncCopyHandle that can be waited via ready()/get() + m.def( + "start_copy", + [py_cache](py::object data, const std::string& device, bool use_pinned_staging, + bool use_background_thread, bool pack_cpu_tensors, int64_t min_packed_alignment_bytes, + int64_t max_packed_chunk_bytes) { + const PyConversionCtx ctx = make_py_conversion_ctx_from_cache(py_cache); + return start_copy_impl(std::move(data), device, use_pinned_staging, use_background_thread, + pack_cpu_tensors, min_packed_alignment_bytes, max_packed_chunk_bytes, ctx); + }, + "Start an async copy of a nested list/tuple/dict of tensors to the given device (string).", + py::arg("data"), py::arg("device"), py::arg("use_pinned_staging") = true, + py::arg("use_background_thread") = true, py::arg("pack_cpu_tensors") = true, + py::arg("min_packed_alignment_bytes") = 16, py::arg("max_packed_chunk_bytes") = 32 * 1024 * 1024); +} diff --git a/packages/multi_tensor_copier/docs/api.rst b/packages/multi_tensor_copier/docs/api.rst new file mode 100644 index 0000000..e5a55a3 --- /dev/null +++ b/packages/multi_tensor_copier/docs/api.rst @@ -0,0 +1,4 @@ +API Reference +============= + +.. automodule:: accvlab.multi_tensor_copier diff --git a/packages/multi_tensor_copier/docs/evaluation.rst b/packages/multi_tensor_copier/docs/evaluation.rst new file mode 100644 index 0000000..90cec8c --- /dev/null +++ b/packages/multi_tensor_copier/docs/evaluation.rst @@ -0,0 +1,95 @@ +Evaluation +========== + +This page summarizes the performance of ``multi-tensor-copier`` compared to standard PyTorch +``.to()`` calls when copying training meta-data from CPU to GPU. + +Setup +----- + +The benchmark uses the same data structure as the :doc:`example `: per-sample meta-data from +a multi-camera 3D object detection pipeline, containing variable-size bounding boxes, class IDs, +active flags, depths, and projection matrices for 6 cameras, plus ground truth 3D bounding boxes +with associated attributes. See the :doc:`example ` for the full data structure description. + +.. list-table:: Benchmark Configuration + :header-rows: 1 + + * - Parameter + - Value + * - Batch size + - 16 samples + * - Total tensors per batch + - 528 + * - Total transfer size per batch + - ~150 KB + * - Runs + - 10 + * - Warmup iterations (per run) + - 100 + * - Measured iterations (per run) + - 1000 + +Two baselines are compared against ``multi-tensor-copier``: + +- **``.to()`` hardcoded** -- per-tensor ``.to(device)`` calls with the data structure known at + development time (representative of a manual implementation in a training pipeline). +- **``.to()`` generic** -- a recursive traversal that copies all tensors in an arbitrary nested + structure using ``.to(device)``, with ``isinstance`` checks and dictionary key iteration at each + level. + +.. note:: + + The evaluation measures only the copy time itself, without any concurrent work. In practice, + ``multi_tensor_copier`` allows overlapping the copy with other computation (see the + :doc:`example `), which can hide some of the latency. The speedups + reported here therefore reflect the improvement in raw copy throughput, not necessarily the full + potential benefit in an end-to-end training loop. + +Hardware +~~~~~~~~ + +.. list-table:: System Configuration + :header-rows: 1 + + * - GPU + - CPU + * - NVIDIA RTX 5000 Ada Generation + - AMD Ryzen 9 7950X 16-Core Processor + + +Results +------- + +.. list-table:: Runtime and Speedup (mean +/- std over 10 runs) + :header-rows: 1 + + * - Method + - Runtime [ms] + - Speedup + * - ``.to()`` hardcoded + - 3.035 +/- 0.006 + - (baseline) + * - ``.to()`` generic + - 3.172 +/- 0.006 + - (baseline) + * - ``multi_tensor_copier`` + - 0.375 +/- 0.008 + - **8.10x** +/- 0.16 vs hardcoded, **8.47x** +/- 0.16 vs generic + +The ``multi-tensor-copier`` package achieves a speedup of approximately **8x** over both baselines. +The generic traversal baseline is slightly slower than the hardcoded baseline due to Python overhead from +``isinstance`` checks and dictionary key iteration, but the difference is small compared to the +overall runtime. + +.. note:: + + In this example the absolute copy time of the baseline (~3 ms with ``.to()``) is moderate. As the + complexity of the meta-data grows (e.g. with additional variable-length annotations such as lane + geometry with multiple lanes per sample that cannot be combined into single tensors), the number of + tensors and thus the overall transfer overhead increases, leading to larger optimization potential. + Similarly, larger batch sizes multiply the number of tensors proportionally. + +.. seealso:: + + The evaluation script can be found at ``packages/multi_tensor_copier/example/evaluation.py``. diff --git a/packages/multi_tensor_copier/docs/example.rst b/packages/multi_tensor_copier/docs/example.rst new file mode 100644 index 0000000..6a5e14b --- /dev/null +++ b/packages/multi_tensor_copier/docs/example.rst @@ -0,0 +1,73 @@ +Example +======= + +Here, we provide an example of how to use the ``multi-tensor-copier`` package to efficiently copy data +containing many small tensors in a nested structure (here: training meta-data) from CPU to GPU. + +The example consists of the following steps: + +1. Construction of per-sample meta-data with variable-size tensors (for illustration purposes; in a real + use-case, the meta-data originates e.g. from a PyTorch DataLoader) +2. Asynchronous copy of the entire batch of meta-data to the GPU +3. Overlapping useful work with the transfer +4. Retrieval and consumption of the GPU-resident meta-data + +.. important:: + + You can run the example using the script ``packages/multi_tensor_copier/example/example.py``. + + +Example Data Structure +---------------------- + +The meta-data is organized as a list of per-sample dictionaries (one per batch element). Each sample +dictionary contains: + +- ``"cams_gt"``: a list of 6 camera dicts, each holding: + + - ``"bounding_boxes"``: an ``(N, 4)`` tensor of 2D bounding boxes, where ``N`` varies per image + (number of visible objects) + - ``"class_ids"``: an ``(N,)`` tensor of integer class IDs + - ``"active"``: an ``(N,)`` boolean tensor indicating active objects + - ``"depths"``: an ``(N,)`` tensor of depth values + - ``"proj_mat"``: a ``(3, 4)`` projection matrix from camera coordinates to image coordinates + +- ``"gt_data"``: a dict with: + + - ``"bounding_boxes_3d"``: an ``(N, 7)`` tensor of 3D ground truth bounding boxes, where ``N`` varies per + sample (number of ground truth objects) + - ``"class_ids"``: an ``(N,)`` tensor of integer class IDs + - ``"active"``: an ``(N,)`` boolean tensor indicating active objects + +This nested structure of lists, dicts, and variable-size tensors is representative of real-world +training tasks (e.g. a multi-camera 3D object detection like +`StreamPETR `_). It is also a +scenario where standard PyTorch ``.to()`` calls are particularly inefficient: the batch contains many small +tensors in non-pinned memory, and each individual ``.to()`` call incurs overhead that can +dominate the actual transfer time for a small tensor. See the :doc:`introduction ` for a detailed discussion of the +motivation and the optimizations that ``multi-tensor-copier`` applies to ensure efficient copying in this +scenario. + + +Workflow +-------- + +The optimizations described in the :doc:`intro` are applied automatically (all enabled by default). + +The following snippet shows the core workflow. After the batch meta-data has been assembled (see the +full script at ``packages/multi_tensor_copier/example/example.py`` for the data creation helpers used in this +example), we pass it to :func:`~accvlab.multi_tensor_copier.start_copy` together with the target device. +The function traverses the nested structure and returns an +:class:`~accvlab.multi_tensor_copier.AsyncCopyHandle` while the transfer proceeds in the background. +Because the copy runs asynchronously, the main thread is free to perform other operations while the +transfer is in flight (e.g. computations not involving the copied data, logging, etc.). +Finally, :meth:`~accvlab.multi_tensor_copier.AsyncCopyHandle.get` blocks until the copy is complete +and returns the a nested structure corresponding to the input, but with all tensors now residing on the GPU. + +.. note-literalinclude:: ../example/example.py + :language: python + :caption: packages/multi_tensor_copier/example/example.py + :linenos: + :lineno-match: + :start-at: # ----------------------- Create the batch meta-data ----------------------- + :end-before: if __name__ diff --git a/packages/multi_tensor_copier/docs/index.rst b/packages/multi_tensor_copier/docs/index.rst new file mode 100644 index 0000000..0ac92d8 --- /dev/null +++ b/packages/multi_tensor_copier/docs/index.rst @@ -0,0 +1,21 @@ +Multi-Tensor Copier +=================== + +This is the documentation for the ``accvlab.multi_tensor_copier`` package. + + +This documentation contains the following sections: + +.. toctree:: + :maxdepth: 1 + + intro + api + example + evaluation + + +.. + Index + ----- + * :ref:`genindex` diff --git a/packages/multi_tensor_copier/docs/intro.rst b/packages/multi_tensor_copier/docs/intro.rst new file mode 100644 index 0000000..9c3f31c --- /dev/null +++ b/packages/multi_tensor_copier/docs/intro.rst @@ -0,0 +1,124 @@ +Introduction +============ + +The ``multi_tensor_copier`` package provides functionality for efficient copying of tensors contained in +nested structures (lists, tuples, dicts) between devices. +Its primary goal is to optimize **CPU to GPU transfers**, especially for many small tensors in non-pinned +memory. Other copy directions (GPU to CPU, GPU to GPU, CPU to CPU) are also supported and benefit from +some of the optimizations, but are not the main focus. + +Motivation +---------- + +Standard PyTorch copy operations (e.g. :meth:`torch.Tensor.to`) have two properties that make them +inefficient for the scenario of transferring many small tensors to the GPU (e.g. for transferring per-sample +meta-data to the GPU): + +1. **Non-pinned memory cannot be copied asynchronously.** PyTorch's ``non_blocking=True`` only yields + truly asynchronous host-to-device (H2D) transfers when the source tensor resides in pinned (page-locked) + memory. In some workloads -- e.g. when tensors originate from a :class:`torch.utils.data.DataLoader` + with ``pin_memory=False`` or when they are obtained e.g. by reading pickled numpy arrays -- this + precondition is not met, and every transfer blocks the calling thread. + +2. **Per-tensor overhead dominates for small tensors.** Each call to ``.to()`` incurs overhead. For small + tensors (e.g. variable-length annotations in object detection), this overhead can exceed the actual + transfer time, so that if many small tensors are present, this can lead to a considerable overhead and + dominate the actual transfer time. + +.. note:: + + Apart from improving copying efficiency, the package also makes copying multiple tensors more convenient + by automatically traversing the input structure and copying all contained tensors to the target device. + +Features +-------- + +The package addresses the efficiency issues through the following optimizations, all of which are +configurable (i.e. can be enabled or disabled): + +**Automatic packing of small tensors** (``pack_cpu_tensors``, default: enabled) + Multiple small contiguous CPU tensors (up to 256 KB each, mixed dtypes supported) are **automatically** + packed into one or more fixed-size byte buffers and transferred with one H2D copy per buffer. On the + GPU side, per-tensor views into the packed allocations are created with configurable alignment + (``min_packed_alignment_bytes``) enforced for the individual outputs. This optimization is **only + applicable to CPU to GPU transfers**. + + .. important:: + + This feature is a major contribution to the overall performance optimization vs. using standard + PyTorch ``.to()`` calls on the individual tensors. For this optimization to be applied, the input CPU + tensors must be contiguous. + +**Parallel pinned memory staging** (``use_pinned_staging``, default: enabled) + For CPU to GPU transfers, input tensors are first copied into pinned host buffers (in parallel) so that the + subsequent H2D transfer can use ``non_blocking=True``. For GPU to CPU transfers, + output is written directly into a pinned host buffer via an asynchronous D2H copy on a CUDA stream, and the + pinned tensor is returned as the result. + +**Background-thread scheduling** (``use_background_thread``, default: enabled) + The copy orchestration (buffer allocation, staging, and CUDA copy submission) runs on a C++ background + thread rather than the calling Python thread. + :func:`~accvlab.multi_tensor_copier.start_copy` returns a handle before the copies complete; + the caller can do other work and retrieve results via + :meth:`~accvlab.multi_tensor_copier.AsyncCopyHandle.get`. Note that parallel CPU staging is used regardless + of this setting. The background-thread scheduling benefits all copy directions, including CPU to CPU. + +**Nested structure traversal** + Input may be an arbitrarily nested combination of :class:`list`, :class:`tuple`, and :class:`dict` + containers with :class:`torch.Tensor` or :class:`numpy.ndarray` leaves. The output preserves the original + structure. Non-tensor, non-container leaves (e.g. strings) are passed through unchanged. Numpy + arrays are converted to PyTorch tensors during traversal. The automatic handling of nested structures + **greatly simplifies copying of nested structures of tensors** while also allowing for **automatic packing + of small tensors** (see above) without the need for manual bookkeeping. + +Integration +----------- + +The copy can be started wherever the data is needed -- not e.g. only directly after it is obtained from +a PyTorch DataLoader. For example, if the GPU-resident data is only required for loss computation, +:func:`~accvlab.multi_tensor_copier.start_copy` can be called at the beginning of the loss +computation step, ideally with some work performed in the meantime to overlap with the asynchronous +copy. This means the package can be integrated into existing training loops with only local +modifications, and can also be used with data originating from other sources than a DataLoader. + +.. note:: + + At the time of the :func:`~accvlab.multi_tensor_copier.start_copy` call, the active PyTorch + streams on all involved CUDA devices are captured. All copy work is then enqueued on, or + synchronized with, these captured streams so that transfers are correctly ordered with respect + to preceding GPU operations — no manual synchronization is required. Non-default stream + contexts (e.g. :func:`torch.cuda.stream`) are respected. + +Supported Copy Directions +------------------------- + +The table below summarizes which optimizations apply to each copy direction: + +.. list-table:: + :header-rows: 1 + :widths: 25 15 15 15 15 + + * - Optimization + - CPU → GPU + - GPU → CPU + - GPU → GPU + - CPU → CPU + * - Pinned staging + - ✓ + - ✓ + - + - + * - Packing + - ✓ + - + - + - + * - Background thread + - ✓ + - ✓ + - ✓ + - ✓ + +.. seealso:: + + Refer to the :doc:`api` for the full parameter reference and the :doc:`example` for a usage example. diff --git a/packages/multi_tensor_copier/docu_referenced_dirs.txt b/packages/multi_tensor_copier/docu_referenced_dirs.txt new file mode 100644 index 0000000..9d9794e --- /dev/null +++ b/packages/multi_tensor_copier/docu_referenced_dirs.txt @@ -0,0 +1,6 @@ +# This file lists additional directories (besides docs) that are referenced by documentation +# The docs directory is always mirrored automatically +# Add one directory name per line, without the docs directory +# Lines starting with # are comments and are ignored + +example diff --git a/packages/multi_tensor_copier/example/evaluation.py b/packages/multi_tensor_copier/example/evaluation.py new file mode 100644 index 0000000..28035a4 --- /dev/null +++ b/packages/multi_tensor_copier/example/evaluation.py @@ -0,0 +1,211 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Evaluation script for measuring the runtime of multi_tensor_copier vs. naive PyTorch .to() copying. + +Uses the same data structure as the example (multi-camera 3D detection meta-data). +""" + +import time + +import torch +import numpy as np + +import accvlab.multi_tensor_copier as mtc + +try: + from example import create_batch_meta_data +except ImportError: + raise ImportError( + "Could not import 'example'. Please run this script from the " + "'packages/multi_tensor_copier/example/' directory." + ) from None + + +NUM_RUNS = 10 +NUM_WARMUP = 100 +NUM_ITERATIONS = 1000 +DEVICE = "cuda:0" + + +def copy_batch_meta_data_naive(batch_meta_data, device): + """Copy batch meta-data to device using per-tensor .to() calls with known structure. + + This is representative of what a manual implementation would look like when the data layout + is known at development time (as is typically the case in a training pipeline). + """ + gpu_batch = [] + for sample in batch_meta_data: + gpu_cams = [] + for cam in sample["cams_gt"]: + gpu_cams.append( + { + "bounding_boxes": cam["bounding_boxes"].to(device), + "class_ids": cam["class_ids"].to(device), + "active": cam["active"].to(device), + "depths": cam["depths"].to(device), + "proj_mat": cam["proj_mat"].to(device), + } + ) + gpu_sample = { + "cams_gt": gpu_cams, + "gt_data": { + "bounding_boxes_3d": sample["gt_data"]["bounding_boxes_3d"].to(device), + "class_ids": sample["gt_data"]["class_ids"].to(device), + "active": sample["gt_data"]["active"].to(device), + }, + } + gpu_batch.append(gpu_sample) + return gpu_batch + + +def copy_nested_to_device_generic(data, device): + """Recursively copy all tensors in a nested structure to `device` using .to(). + + Generic version that walks the structure without knowing its layout. + """ + if isinstance(data, torch.Tensor): + return data.to(device) + if isinstance(data, list): + return [copy_nested_to_device_generic(item, device) for item in data] + if isinstance(data, tuple): + return tuple(copy_nested_to_device_generic(item, device) for item in data) + if isinstance(data, dict): + return {k: copy_nested_to_device_generic(v, device) for k, v in data.items()} + return data + + +def benchmark(copy_fn, batch_meta_data, device, num_warmup, num_iterations): + for _ in range(num_warmup): + _ = copy_fn(batch_meta_data, device) + torch.cuda.synchronize() + + times = [] + for _ in range(num_iterations): + torch.cuda.synchronize() + t0 = time.perf_counter() + _ = copy_fn(batch_meta_data, device) + torch.cuda.synchronize() + t1 = time.perf_counter() + times.append(t1 - t0) + return times + + +def benchmark_mtc(batch_meta_data, device, num_warmup, num_iterations): + for _ in range(num_warmup): + handle = mtc.start_copy(batch_meta_data, device) + _ = handle.get() + torch.cuda.synchronize() + + times = [] + for _ in range(num_iterations): + torch.cuda.synchronize() + t0 = time.perf_counter() + handle = mtc.start_copy(batch_meta_data, device) + _ = handle.get() + torch.cuda.synchronize() + t1 = time.perf_counter() + times.append(t1 - t0) + return times + + +def mean_ms(times_s): + return np.mean(times_s) * 1000 + + +def count_tensors(data): + """Count the total number of tensors in a nested structure.""" + if isinstance(data, torch.Tensor): + return 1 + if isinstance(data, (list, tuple)): + return sum(count_tensors(item) for item in data) + if isinstance(data, dict): + return sum(count_tensors(v) for v in data.values()) + return 0 + + +def total_bytes(data): + """Compute total bytes of all tensors in a nested structure.""" + if isinstance(data, torch.Tensor): + return data.numel() * data.element_size() + if isinstance(data, (list, tuple)): + return sum(total_bytes(item) for item in data) + if isinstance(data, dict): + return sum(total_bytes(v) for v in data.values()) + return 0 + + +def main(): + batch_meta_data = create_batch_meta_data() + + n_samples = len(batch_meta_data) + n_tensors = count_tensors(batch_meta_data) + n_bytes = total_bytes(batch_meta_data) + + print("=" * 70) + print("multi_tensor_copier evaluation") + print("=" * 70) + print(f" Batch size: {n_samples} samples") + print(f" Total tensors: {n_tensors}") + print(f" Total bytes: {n_bytes} ({n_bytes / 1024:.1f} KB)") + print(f" Target device: {DEVICE}") + print(f" Runs: {NUM_RUNS}") + print(f" Warmup iters: {NUM_WARMUP} (per run)") + print(f" Measured iters: {NUM_ITERATIONS} (per run)") + print() + + hardcoded_means = [] + generic_means = [] + mtc_means = [] + + for run in range(NUM_RUNS): + print(f"Run {run + 1}/{NUM_RUNS}...") + + hardcoded_times = benchmark( + copy_batch_meta_data_naive, batch_meta_data, DEVICE, NUM_WARMUP, NUM_ITERATIONS + ) + hardcoded_means.append(mean_ms(hardcoded_times)) + + generic_times = benchmark( + copy_nested_to_device_generic, batch_meta_data, DEVICE, NUM_WARMUP, NUM_ITERATIONS + ) + generic_means.append(mean_ms(generic_times)) + + mtc_times = benchmark_mtc(batch_meta_data, DEVICE, NUM_WARMUP, NUM_ITERATIONS) + mtc_means.append(mean_ms(mtc_times)) + + hardcoded_means = np.array(hardcoded_means) + generic_means = np.array(generic_means) + mtc_means = np.array(mtc_means) + + speedups_hardcoded = hardcoded_means / mtc_means + speedups_generic = generic_means / mtc_means + + print() + print("Results (mean runtime per run, aggregated over runs):") + print("-" * 70) + print(f" .to() hardcoded: {hardcoded_means.mean():.3f} +/- {hardcoded_means.std():.3f} ms") + print(f" .to() generic: {generic_means.mean():.3f} +/- {generic_means.std():.3f} ms") + print(f" multi_tensor_copier: {mtc_means.mean():.3f} +/- {mtc_means.std():.3f} ms") + print() + print( + f" Speedup vs .to() hardcoded: {speedups_hardcoded.mean():.2f}x +/- {speedups_hardcoded.std():.2f}x" + ) + print(f" Speedup vs .to() generic: {speedups_generic.mean():.2f}x +/- {speedups_generic.std():.2f}x") + print("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/packages/multi_tensor_copier/example/example.py b/packages/multi_tensor_copier/example/example.py new file mode 100644 index 0000000..3a2080b --- /dev/null +++ b/packages/multi_tensor_copier/example/example.py @@ -0,0 +1,164 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import torch + +import accvlab.multi_tensor_copier as mtc + +NUM_CAMERAS = 6 +# The config contains 4 samples. This means that the batch size is +# 4 * NUM_SAMPLE_CONFIG_REPEATS (= 16 samples for the default value). +NUM_SAMPLE_CONFIG_REPEATS = 4 + + +def create_sample_meta_data(num_gt_objects: int, num_visible_objects_per_cam: list[int]) -> dict: + """Create a single sample's meta-data dict, simulating a multi-camera 3D detection pipeline.""" + + # @NOTE + # Each sample contains two top-level entries: + # - "cams_gt": a list with one dict per camera, holding variable-size 2D bounding boxes, + # class IDs, active flags, depths, and a projection matrix + # - "gt_data": a dict with 3D ground truth bounding boxes, class IDs, and active flags + # Several tensors have a variable first dimension (number of visible / GT objects), + # which is the typical scenario that benefits from multi_tensor_copier: many small, + # variable-size tensors that need to be transferred together. + cams = [] + for cam_idx in range(NUM_CAMERAS): + n_visible = num_visible_objects_per_cam[cam_idx] + cam_data = { + "bounding_boxes": torch.rand(n_visible, 4), + "class_ids": torch.randint(0, 10, (n_visible,)), + "active": torch.randint(0, 2, (n_visible,), dtype=torch.bool), + "depths": torch.rand( + n_visible, + ), + "proj_mat": torch.rand(3, 4), + } + cams.append(cam_data) + + gt_data = { + "bounding_boxes_3d": torch.rand(num_gt_objects, 7), + "class_ids": torch.randint(0, 10, (num_gt_objects,)), + "active": torch.randint(0, 2, (num_gt_objects,), dtype=torch.bool), + } + + sample_data = {"cams_gt": cams, "gt_data": gt_data} + + return sample_data + + +def create_batch_meta_data() -> list[dict]: + """Assemble a batch of per-sample meta-data dicts.""" + + # @NOTE + # In a real training loop the meta-data originates from the DataLoader. Here we create + # synthetic data where each sample has a different number of GT objects and a different + # number of visible objects per camera, reflecting realistic variability. + base_sample_configs = [ + {"num_gt": 60, "visible_per_cam": [30, 20, 40, 10, 50, 20]}, + {"num_gt": 100, "visible_per_cam": [60, 40, 30, 70, 20, 50]}, + {"num_gt": 45, "visible_per_cam": [10, 20, 10, 30, 20, 10]}, + {"num_gt": 120, "visible_per_cam": [80, 50, 60, 40, 70, 30]}, + ] + sample_configs = base_sample_configs * NUM_SAMPLE_CONFIG_REPEATS + + batch = [] + for cfg in sample_configs: + sample = create_sample_meta_data(cfg["num_gt"], cfg["visible_per_cam"]) + batch.append(sample) + return batch + + +def dummy_compute(): + """Placeholder for useful work that can overlap with the CPU-to-GPU copy.""" + _ = torch.ones(256, 256) @ torch.ones(256, 256) + + +def dummy_process(gpu_meta_data: list[dict]): + """Placeholder for a function that consumes the copied meta-data on the GPU.""" + for sample_idx, sample in enumerate(gpu_meta_data): + num_cams = len(sample["cams_gt"]) + num_gt = sample["gt_data"]["bounding_boxes_3d"].shape[0] + device = sample["gt_data"]["bounding_boxes_3d"].device + print(f" Sample {sample_idx}: {num_cams} cameras, {num_gt} GT objects (device: {device})") + + +def main(): + + # ----------------------- Create the batch meta-data ----------------------- + + # @NOTE + # Here, the per-sample meta-data tensors are not combined into per-batch meta-data tensors. This is due to + # the fact that e.g. the number of visible objects per camera varies per sample, making combination & + # handling of combined tensors cumbersome. + batch_meta_data = create_batch_meta_data() + + # ------------------------ Start the asynchronous copy ------------------------ + + # @NOTE + # `start_copy()` traverses the nested structure of lists, tuples, and dicts, and + # asynchronously copies all contained tensors to the target device. It returns a handle + # before the copy is complete so that the calling thread can continue with other work while the transfer + # is in progress. + # Under the hood, the package applies several optimizations automatically (all enabled by + # default): tensors are staged into pinned host memory for truly non-blocking H2D copies, + # and small tensors are packed into (one or more) staging buffers to reduce per-tensor overhead. All of + # this runs on a background thread, so this call returns before the copies complete. + # + # Note that this copy can be started anywhere the data is needed (i.e. not only when obtaining the data + # from a DataLoader), so that it can be used with only local modifications to the training loop. For + # example, if the meta-data on the GPU is only needed for loss computation, the copy can + # be started inside the loss computation implementation (ideally with some work done in the meantime to + # overlap with the asynchronous copy). + handle = mtc.start_copy(batch_meta_data, "cuda:0") + + # @NOTE + # IMPORTANT: Because the copy runs asynchronously, the input tensors must not be freed or + # modified in-place until the copy has completed (i.e. until `handle.get()` returns or + # `handle.ready()` returns `True`). See the `start_copy()` function documentation for details. + + # -------------------- Overlap with other work -------------------- + + # @NOTE + # Because `start_copy()` is asynchronous, we can overlap the CPU-to-GPU transfer with + # other computation. Note that running asynchrounously with the copy is not the only (and not the most + # important) optimization that is applied, so that this is beneficial but optional. + dummy_compute() + + # -------------------- Retrieve and use the results -------------------- + + # @NOTE + # `handle.get()` blocks until the copy is complete and returns the same nested structure + # with all tensors now residing on the target device. Non-tensor leaves (if any) are + # passed through unchanged. + gpu_meta_data = handle.get() + + # @NOTE + # The copied data can now be consumed by downstream GPU operations (e.g. the detection + # head, loss computation, etc.). + # + # Note on performance: For this simplified example, multi_tensor_copier achieves a + # significant speedup over naive per-tensor .to() calls (see the evaluation script for + # measurements). However, the absolute overhead of meta-data copying is moderate here. + # In more complex real-world pipelines, the meta-data can be more extensive (e.g. additional + # variable-length lane geometry where multiple lanes per sample cannot be combined into + # single tensors due to variable size; or additional sensor modalities), which increases the number of + # tensors and thus the per-tensor overhead. In such cases -- and with larger batch sizes -- the absolute + # time savings grow accordingly. + print("GPU meta-data ready:") + dummy_process(gpu_meta_data) + + +if __name__ == "__main__": + main() diff --git a/packages/multi_tensor_copier/pyproject.toml b/packages/multi_tensor_copier/pyproject.toml new file mode 100644 index 0000000..4ac2c73 --- /dev/null +++ b/packages/multi_tensor_copier/pyproject.toml @@ -0,0 +1,28 @@ +[build-system] +requires = [ + "setuptools>=64", + "wheel", + "torch>=2.0.0", +] +build-backend = "setuptools.build_meta" + +[project] +name = "accvlab.multi_tensor_copier" +version = "0.1.0" +description = "Async copying of nested tensor structures (ACCV-Lab)." +requires-python = ">=3.8" +dependencies = [ + "torch>=2.0.0", + "numpy>=1.22.2", + "accvlab-build-config>=0.1.0", +] + +[project.optional-dependencies] +optional = [ + "pytest", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["accvlab.multi_tensor_copier*"] + diff --git a/packages/multi_tensor_copier/setup.py b/packages/multi_tensor_copier/setup.py new file mode 100644 index 0000000..4eff528 --- /dev/null +++ b/packages/multi_tensor_copier/setup.py @@ -0,0 +1,55 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from setuptools import find_namespace_packages, setup +from torch.utils.cpp_extension import BuildExtension, CUDAExtension + +from accvlab_build_config import detect_cuda_info, get_compile_flags, load_config + + +def get_extensions(): + config = load_config() + cuda_info = detect_cuda_info() + compile_flags = get_compile_flags(config, cuda_info) + + return [ + CUDAExtension( + name="accvlab.multi_tensor_copier._ext", + sources=[ + "accvlab/multi_tensor_copier/csrc/multi_tensor_copier.cpp", + ], + extra_compile_args={ + "cxx": compile_flags["cxx"], + "nvcc": compile_flags["nvcc"], + }, + define_macros=[ + ("TORCH_EXTENSION_NAME", "_ext"), + ], + include_dirs=compile_flags["include_dirs"], + ), + ] + + +setup( + name="accvlab.multi_tensor_copier", + version="0.1.0", + description="Async copying of nested tensor structures (ACCV-Lab).", + packages=find_namespace_packages(include=["accvlab.multi_tensor_copier*"]), + ext_modules=get_extensions(), + cmdclass={"build_ext": BuildExtension}, + python_requires=">=3.8", + zip_safe=False, +) diff --git a/packages/multi_tensor_copier/tests/test_multi_tensor_copier.py b/packages/multi_tensor_copier/tests/test_multi_tensor_copier.py new file mode 100644 index 0000000..5c1068a --- /dev/null +++ b/packages/multi_tensor_copier/tests/test_multi_tensor_copier.py @@ -0,0 +1,399 @@ +# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import numpy as np +import pytest +import torch + + +def _storage_data_ptr(t: torch.Tensor) -> int: + # Prefer untyped storage (newer PyTorch); fall back for older versions. + if hasattr(t, "untyped_storage"): + return int(t.untyped_storage().data_ptr()) + return int(t.storage().data_ptr()) # type: ignore[attr-defined] + + +def _round_up(x: int, a: int) -> int: + if a <= 1: + return x + rem = x % a + return x if rem == 0 else (x + (a - rem)) + + +def test_multi_tensor_copier_nested_structure_and_values(): + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + # Nested structure with tuples/lists (output preserves tuples). + data = [ + torch.arange(12, dtype=torch.float32).reshape(3, 4), + ( + torch.ones((2, 3), dtype=torch.float16) * 7, + [ + torch.zeros((1,), dtype=torch.int64), + (torch.randn((5,), dtype=torch.float32),), + ], + ), + ] + + # background-thread path (returns earlier; staging/submission happens off-thread). + h = mtc.start_copy(data, device, use_pinned_staging=True) + out = h.get() + + assert isinstance(out, list) + assert isinstance(out[0], torch.Tensor) + assert isinstance(out[1], tuple) + + leaves_in = [data[0], data[1][0], data[1][1][0], data[1][1][1][0]] + leaves_out = [out[0], out[1][0], out[1][1][0], out[1][1][1][0]] + + for a, b in zip(leaves_in, leaves_out): + assert b.device == device + assert a.shape == b.shape + assert a.dtype == b.dtype + torch.testing.assert_close(a, b.cpu()) + + assert h.ready() is True + + +def test_multi_tensor_copier_numpy_to_cpu_tensors_only(): + # numpy.ndarray leaves should be accepted and converted to CPU torch.Tensors during traversal. + import accvlab.multi_tensor_copier as mtc + + data = [ + np.arange(12, dtype=np.float32).reshape(3, 4), + (np.ones((2, 3), dtype=np.float32), [np.zeros((1,), dtype=np.int64)]), + ] + + h = mtc.start_copy(data, "cpu", pack_cpu_tensors=False) + out = h.get() + + assert isinstance(out, list) + assert isinstance(out[0], torch.Tensor) + assert out[0].device.type == "cpu" + torch.testing.assert_close(out[0], torch.from_numpy(data[0])) + + assert isinstance(out[1], tuple) + assert isinstance(out[1][0], torch.Tensor) + torch.testing.assert_close(out[1][0], torch.from_numpy(data[1][0])) + + assert isinstance(out[1][1], list) + assert isinstance(out[1][1][0], torch.Tensor) + torch.testing.assert_close(out[1][1][0], torch.from_numpy(data[1][1][0])) + + +def test_multi_tensor_copier_dict_and_passthrough_leaves(): + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + marker = object() + + data = { + "a": torch.arange(6, dtype=torch.float32).reshape(2, 3), + "b": (np.ones((2,), dtype=np.float32), {"meta": marker}), + } + + h = mtc.start_copy(data, device, use_pinned_staging=True) + out = h.get() + + assert isinstance(out, dict) + assert set(out.keys()) == {"a", "b"} + assert isinstance(out["b"], tuple) + assert isinstance(out["b"][1], dict) + assert out["b"][1]["meta"] is marker # passthrough identity preserved + + assert isinstance(out["a"], torch.Tensor) + assert out["a"].device == device + torch.testing.assert_close(out["a"].cpu(), data["a"]) + + assert isinstance(out["b"][0], torch.Tensor) + assert out["b"][0].device == device + torch.testing.assert_close(out["b"][0].cpu(), torch.from_numpy(data["b"][0])) + + +def test_multi_tensor_copier_root_leaf_behavior(): + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + t = torch.arange(5, dtype=torch.int64) + h = mtc.start_copy(t, device) + out = h.get() + assert isinstance(out, torch.Tensor) + assert out.device == device + torch.testing.assert_close(out.cpu(), t) + + marker = object() + h2 = mtc.start_copy(marker, device) + out2 = h2.get() + assert out2 is marker + + +@pytest.mark.parametrize("use_pinned_staging", [True, False]) +def test_multi_tensor_copier_pack_cpu_tensors(use_pinned_staging: bool): + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + # Many small CPU tensors of the same dtype -> should trigger packing when enabled. + data = [ + [torch.arange(64, dtype=torch.float32) + i for i in range(50)], + [torch.ones((16,), dtype=torch.float32) * 3.0, torch.zeros((8,), dtype=torch.float32)], + ] + + h = mtc.start_copy( + data, + device, + use_pinned_staging=use_pinned_staging, + pack_cpu_tensors=True, + ) + + out = h.get() + assert isinstance(out, list) + assert isinstance(out[0], list) + assert isinstance(out[1], list) + + for i in range(50): + torch.testing.assert_close(out[0][i].cpu(), data[0][i]) + assert out[0][i].device == device + + for i in range(len(data[1])): + torch.testing.assert_close(out[1][i].cpu(), data[1][i]) + assert out[1][i].device == device + + +@pytest.mark.parametrize("min_packed_alignment_bytes", [1, 16, 6]) +@pytest.mark.parametrize("use_pinned_staging", [True, False]) +def test_multi_tensor_copier_pack_cpu_tensors_mixed_dtypes_and_alignment( + min_packed_alignment_bytes: int, use_pinned_staging: bool +): + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + # Mixed dtypes (incl. complex). Include one non-contiguous tensor to ensure it falls back. + a_f32 = torch.arange(32, dtype=torch.float32).reshape(8, 4) + a_i64 = torch.arange(17, dtype=torch.int64) + a_f16 = (torch.arange(11, dtype=torch.float16) + 1).reshape(-1) + a_c64 = (torch.arange(9, dtype=torch.float32) + 1j * torch.arange(9, dtype=torch.float32)).to( + torch.complex64 + ) + a_c128 = (torch.arange(5, dtype=torch.float64) + 1j * torch.arange(5, dtype=torch.float64)).to( + torch.complex128 + ) + a_noncontig = torch.arange(12, dtype=torch.float32).reshape(3, 4).t() # non-contiguous + + data = [a_f32, [a_i64, (a_f16, [a_c64, a_c128, a_noncontig])]] + + h = mtc.start_copy( + data, + device, + use_pinned_staging=use_pinned_staging, + pack_cpu_tensors=True, + min_packed_alignment_bytes=min_packed_alignment_bytes, + ) + out = h.get() + + leaves_in = [a_f32, a_i64, a_f16, a_c64, a_c128, a_noncontig] + leaves_out = [out[0], out[1][0], out[1][1][0], out[1][1][1][0], out[1][1][1][1], out[1][1][1][2]] + + for a, b in zip(leaves_in, leaves_out): + assert b.device == device + assert a.shape == b.shape + assert a.dtype == b.dtype + torch.testing.assert_close(a, b.cpu()) + + # Alignment check for the packed buffer views (if packing happened): + # group outputs by shared storage base pointer and validate offsets in the largest group. + storage_ptrs = [_storage_data_ptr(t) for t in leaves_out] + base_ptr = max(set(storage_ptrs), key=storage_ptrs.count) + packed_out = [t for t in leaves_out if _storage_data_ptr(t) == base_ptr] + assert len(packed_out) >= 2 # packing should trigger for this case + for t in packed_out: + elem_sz = int(t.element_size()) + required_align = max(int(min_packed_alignment_bytes), elem_sz) + required_align = _round_up(required_align, elem_sz) + byte_off = int(t.data_ptr()) - int(base_ptr) + assert byte_off % required_align == 0 + + +@pytest.mark.parametrize("use_pinned_staging", [True, False]) +def test_multi_tensor_copier_gpu_to_cpu(use_pinned_staging: bool): + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + data_cpu = [ + torch.arange(12, dtype=torch.float32).reshape(3, 4), + ( + torch.ones((2, 3), dtype=torch.float16) * 7, + [ + torch.zeros((1,), dtype=torch.int64), + (torch.randn((5,), dtype=torch.float32),), + ], + ), + ] + data_gpu = [ + data_cpu[0].to(device), + ( + data_cpu[1][0].to(device), + [ + data_cpu[1][1][0].to(device), + (data_cpu[1][1][1][0].to(device),), + ], + ), + ] + + h = mtc.start_copy(data_gpu, "cpu", use_pinned_staging=use_pinned_staging) + out = h.get() + + assert isinstance(out, list) + assert isinstance(out[0], torch.Tensor) + assert isinstance(out[1], tuple) + + leaves_ref = [data_cpu[0], data_cpu[1][0], data_cpu[1][1][0], data_cpu[1][1][1][0]] + leaves_out = [out[0], out[1][0], out[1][1][0], out[1][1][1][0]] + + for ref, result in zip(leaves_ref, leaves_out): + assert result.device.type == "cpu" + assert ref.shape == result.shape + assert ref.dtype == result.dtype + torch.testing.assert_close(ref, result) + + if use_pinned_staging: + for result in leaves_out: + assert result.is_pinned() + + assert h.ready() is True + + +@pytest.mark.parametrize("use_pinned_staging", [True, False]) +def test_multi_tensor_copier_gpu_to_cpu_many_small_tensors(use_pinned_staging: bool): + """D2H with many small tensors.""" + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + data_gpu = [torch.arange(64, dtype=torch.float32, device=device) + i for i in range(50)] + + h = mtc.start_copy(data_gpu, "cpu", use_pinned_staging=use_pinned_staging) + out = h.get() + + assert isinstance(out, list) + assert len(out) == 50 + + for i in range(50): + expected = torch.arange(64, dtype=torch.float32) + i + assert out[i].device.type == "cpu" + torch.testing.assert_close(out[i], expected) + + +@pytest.mark.parametrize("target_device", ["cuda:0", "cpu"]) +@pytest.mark.parametrize("use_pinned_staging", [True, False]) +@pytest.mark.parametrize("pack_cpu_tensors", [True, False]) +def test_multi_tensor_copier_mixed_devices( + target_device: str, use_pinned_staging: bool, pack_cpu_tensors: bool +): + """Mixed CPU + GPU inputs copied to a GPU or CPU target. + + Verifies that CPU and GPU tensors are handled correctly regardless of target device: + tensors already on the target are reused as-is, others are copied. + """ + import accvlab.multi_tensor_copier as mtc + + gpu = torch.device("cuda:0") + target = torch.device(target_device) + + cpu_a = torch.arange(32, dtype=torch.float32) + cpu_b = torch.ones((4, 3), dtype=torch.float16) + gpu_c = torch.randn(5, dtype=torch.float32, device=gpu) + 10 + gpu_d = torch.arange(8, dtype=torch.int64, device=gpu) + + data = [cpu_a, {"gpu": gpu_c, "cpu": cpu_b}, (gpu_d,)] + + h = mtc.start_copy( + data, + target, + use_pinned_staging=use_pinned_staging, + pack_cpu_tensors=pack_cpu_tensors, + ) + out = h.get() + + assert isinstance(out, list) + assert isinstance(out[1], dict) + assert isinstance(out[2], tuple) + + leaves_in = [cpu_a, cpu_b, gpu_c, gpu_d] + leaves_out = [out[0], out[1]["cpu"], out[1]["gpu"], out[2][0]] + + for ref, result in zip(leaves_in, leaves_out): + assert result.device == target + assert result.shape == ref.shape + assert result.dtype == ref.dtype + torch.testing.assert_close(result.cpu(), ref.cpu()) + + # Tensors already on the target device should be reused (same storage). + for ref, result in zip(leaves_in, leaves_out): + if ref.device == target: + assert _storage_data_ptr(result) == _storage_data_ptr(ref) + + # D2H with pinned staging: GPU-origin outputs land in pinned memory. + if target.type == "cpu" and use_pinned_staging: + for ref, result in zip(leaves_in, leaves_out): + if ref.device.type == "cuda": + assert result.is_pinned() + + +@pytest.mark.parametrize("use_pinned_staging", [True, False]) +def test_multi_tensor_copier_pack_chunked(use_pinned_staging: bool): + """Packing with a tiny chunk size forces multiple chunks; verify correctness and distinct storages.""" + import accvlab.multi_tensor_copier as mtc + + device = torch.device("cuda:0") + + data = [torch.arange(64, dtype=torch.float32) + i for i in range(20)] + total_bytes = sum(t.numel() * t.element_size() for t in data) + chunk_limit = 512 + assert total_bytes > chunk_limit, "test expects data to exceed the chunk limit" + + h = mtc.start_copy( + data, + device, + use_pinned_staging=use_pinned_staging, + pack_cpu_tensors=True, + max_packed_chunk_bytes=chunk_limit, + ) + out = h.get() + + assert isinstance(out, list) + assert len(out) == len(data) + + for i, (ref, result) in enumerate(zip(data, out)): + assert result.device == device, f"output {i} on wrong device" + assert result.shape == ref.shape + assert result.dtype == ref.dtype + torch.testing.assert_close(result.cpu(), ref) + + # Verify that multiple GPU storage bases were used (multiple chunks). + base_ptrs = {_storage_data_ptr(t) for t in out} + assert len(base_ptrs) >= 2, ( + f"expected multiple GPU storage chunks but got {len(base_ptrs)}; " + "chunking may not have been applied" + ) + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/packages/optim_test_tools/accvlab/optim_test_tools/numba_nvtx/nvtx.py b/packages/optim_test_tools/accvlab/optim_test_tools/numba_nvtx/nvtx.py index b14960b..eabcba1 100644 --- a/packages/optim_test_tools/accvlab/optim_test_tools/numba_nvtx/nvtx.py +++ b/packages/optim_test_tools/accvlab/optim_test_tools/numba_nvtx/nvtx.py @@ -18,7 +18,6 @@ from . import _nvtx_numba_ext as _ext # type: ignore[attr-defined] - _SYMBOLS_READY = False diff --git a/packages/optim_test_tools/examples/tensor_dumper_dumping_example.py b/packages/optim_test_tools/examples/tensor_dumper_dumping_example.py index 4d9c7e1..5ef732a 100644 --- a/packages/optim_test_tools/examples/tensor_dumper_dumping_example.py +++ b/packages/optim_test_tools/examples/tensor_dumper_dumping_example.py @@ -150,8 +150,8 @@ def __init__(self, tensor: torch.Tensor): # The custom handling is done by adding a custom extension to the dumper, which is then used to dump # the object (the custom converter is registered above). # 3. `unneeded_data` is excluded from the dump. - # This is useful to e.g. exclude data which is part of the structure, but either not needed in the - # dump, or which will be added to the dump later via custom processing logic (see below for bounding + # This is useful to e.g. exclude data which is part of the structure, but either not needed in the + # dump, or which will be added to the dump later via custom processing logic (see below for bounding # box images). dumper.add_tensor_data( "images.other_images",