Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ This section provides a brief overview of the contained packages & evaluations p
see the `CONTAINED PACKAGES` section in the documentation for a more detailed description of the packages,
their functionality and usage, as well as the API reference for each package and examples demonstrating the
usage of the packages. Evaluation results are also provided for some of the packages (On-Demand Video Decoder,
Batching Helpers, DALI Pipeline Framework).
Batching Helpers, Multi-Tensor Copier, DALI Pipeline Framework).

> **ℹ️ Note**: We are planning to add demos for packages contained in ACCV-Lab in the future. Apart from
> acting as tutorials show-casing real-world examples, they will include the implementation of the experiments
Expand All @@ -34,6 +34,10 @@ The contained packages are:
training.
- **Batching Helpers**: Facilitates easy-to-implement batching for non‑uniform sample sizes, a common issue in
loss computation in the ADAS domain.
- **Multi-Tensor Copier**: Efficient copying of tensors in nested structures (lists, tuples, dicts) from
CPU to GPU. Automatically finds all tensors in the structure, asynchronously applies pinned memory staging,
packs small tensors into a single transfer, and performs other optimizations to significantly reduce the
overhead of copying many small, variable-size tensors e.g. typical in ADAS training meta-data.
- **DALI Pipeline Framework**: Framework on top of
[NVIDIA DALI](https://docs.nvidia.com/deeplearning/dali/user-guide/docs/) that simplifies creation of
pipelines for typical ADAS use‑cases and enables integration into existing training implementations with
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Please see the documentation of each namespace package for usage instructions (a

contained_package_docs_mirror/on_demand_video_decoder/docs/index
contained_package_docs_mirror/batching_helpers/docs/index
contained_package_docs_mirror/multi_tensor_copier/docs/index
contained_package_docs_mirror/dali_pipeline_framework/docs/index
contained_package_docs_mirror/draw_heatmap/docs/index
contained_package_docs_mirror/optim_test_tools/docs/index
Expand Down
7 changes: 7 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ photometric
randomization
rtype
dtype
dtypes
uint
functor
prefetch
Expand Down Expand Up @@ -185,6 +186,12 @@ literalinclude
blockquote
distributable
posix
destructor
parallelly
dicts
enqueue
enqueued
JIT
prepend
prepended
profiler
1 change: 1 addition & 0 deletions namespace_packages_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# 'accvlab.example_skbuild_package',
'accvlab.on_demand_video_decoder',
'accvlab.batching_helpers',
'accvlab.multi_tensor_copier',
'accvlab.dali_pipeline_framework',
'accvlab.draw_heatmap',
'accvlab.optim_test_tools',
Expand Down
2 changes: 1 addition & 1 deletion packages/batching_helpers/docs/example.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Example
========
=======

Here, we provide an example of how to use the `batching-helpers` package to implement object detection loss,
including
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def test_set_data_from_dali_generic_iterator_output_with_flattened_names():
src["sequence"][0]["value"] = 0.0
src["sequence"][1]["value"] = 1.0

# Simulate DALIGenericIterator output: a list of dicts keyed by flattened names
# Simulate DALIGenericIterator output: a list of dictionaries keyed by flattened names
flattened_names = src.field_names_flat
flat_values = src.get_data()
iterator_like_output = [{name: value for name, value in zip(flattened_names, flat_values)}]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .async_copy import AsyncCopyHandle, start_copy

__all__ = [
"AsyncCopyHandle",
"start_copy",
]
169 changes: 169 additions & 0 deletions packages/multi_tensor_copier/accvlab/multi_tensor_copier/async_copy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

from dataclasses import dataclass
from typing import Any

import torch
import numpy as np

from . import _ext


@dataclass
class AsyncCopyHandle:
"""Handle to an in-progress asynchronous copy started by :func:`start_copy`.

Use :meth:`ready` to poll for completion without blocking, or :meth:`get` to block until
the result is available. The handle must be kept alive until the copy is consumed or goes
out of scope; dropping it early triggers a synchronous wait in the destructor to ensure
staging buffers are not freed while transfers are in flight.
"""

_h: Any

def ready(self) -> bool:
"""Return whether the copy has completed.

Returns:
``True`` if the copy has completed, ``False`` otherwise.
"""
return bool(self._h.ready())

def get(self) -> list[Any] | tuple[Any, ...] | dict[Any, Any] | torch.Tensor:
"""Block until the copy is done and return the result.

The returned structure mirrors the input to :func:`start_copy`, with all tensors
copied to the target device.

Returns:
The structure with the contained tensors copied to the target device (and numpy arrays replaced by
PyTorch tensors).

Raises:
RuntimeError: If the copy fails

"""
return self._h.get()


def start_copy(
data: list[Any] | tuple[Any, ...] | dict[Any, Any] | torch.Tensor | np.ndarray,
device: str | torch.device,
*,
use_pinned_staging: bool = True,
pack_cpu_tensors: bool = True,
min_packed_alignment_bytes: int = 16,
max_packed_chunk_bytes: int = 32 * 1024 * 1024,
use_background_thread: bool = True,
) -> AsyncCopyHandle:
"""Asynchronously copy tensors in a nested structure to ``device``.

Traverses an arbitrarily nested combination of :class:`list`, :class:`tuple`, and :class:`dict`
containers, copies every :class:`torch.Tensor` and :class:`numpy.ndarray` (automatically converted to
PyTorch tensors) leaf to ``device``, and returns an :class:`AsyncCopyHandle` whose
:meth:`~AsyncCopyHandle.get` method yields the copied structure. The output preserves container types and
passes through non-tensor, non-container leaves (e.g. strings) unchanged.

The primary optimization target is **CPU → GPU** transfers of many small tensors in non-pinned
memory. Other copy directions (GPU → CPU, GPU → GPU, CPU → CPU) are supported and benefit from
some optimizations (e.g. background-thread scheduling for all directions, parallel pinned staging for
D2H), but are not the main focus.

.. note::

The input tensors do not need to all be on the same device, copying tensors from different devices is
supported. If some tensors are already on the target device, they will be re-used as is.

.. important::

Packing of small tensors (see ``pack_cpu_tensors`` parameter below) is a major contribution to the
overall performance optimization vs. using standard PyTorch ``.to()`` calls on the individual tensors.
For this optimization to be applied, the input CPU tensors must be contiguous.

.. warning::

The caller must not **free** or **modify in-place** any input tensors until the copy has completed
(i.e. until :meth:`~AsyncCopyHandle.get` returns or :meth:`~AsyncCopyHandle.ready` returns ``True``).
Because copies are submitted asynchronously — potentially on a background thread — input tensor memory
may still be read by the GPU after this function returns. Violating this contract leads to undefined
behavior (silent data corruption, stale reads, or CUDA errors).

Important:
Only :class:`list`, :class:`tuple`, and :class:`dict` are recognized as container types.
Other container-like objects (e.g. custom classes, named tuples) are treated as opaque
leaves and returned unchanged; any tensors nested inside them will **not** be copied.

Args:
data: The structure to copy. May be a single :class:`torch.Tensor` or
:class:`numpy.ndarray`, or a nested :class:`list`/:class:`tuple`/:class:`dict` objects containing
tensor/numpy arrays.
device: Target PyTorch device (e.g. ``"cuda:0"``, ``"cpu"``).
use_pinned_staging: When ``True``, allocate pinned (page-locked) host buffers as
intermediate staging for CPU → CUDA and CUDA → CPU transfers. For H2D this enables
``non_blocking`` copies; for D2H the pinned buffer **is** the returned output tensor.
Has no effect on CPU → CPU or GPU → GPU copies.
pack_cpu_tensors: When ``True``, pack multiple small contiguous CPU tensors (≤ 256 KB
each, mixed dtypes supported) into one or more staging buffers (each at most
``max_packed_chunk_bytes``) and issue one H2D transfer per chunk instead of per tensor.
Only applies to CPU → CUDA copies.
min_packed_alignment_bytes: Minimum byte-alignment of each tensor's start offset within
the packed buffer. The effective alignment for each tensor which participates in the packing is
``max(min_packed_alignment_bytes, tensor.element_size())``.
max_packed_chunk_bytes: Maximum payload size in bytes of each packed staging chunk (tensor
data plus inter-tensor alignment padding; the actual allocation may be slightly larger to
satisfy buffer-start alignment). When the total packed data exceeds this limit, multiple
packed chunks are allocated and transferred. Defaults to 32 MB.
use_background_thread: When ``True``, the copy orchestration (buffer allocation, staging,
and CUDA copy submission) runs on a C++ background thread (from a shared pool) so that this
function returns before the copies complete. Note that CPU staging is done parallelly regardless
of this setting. Benefits all copy directions.

Returns:
Handle to the in-progress copy. Call :meth:`~AsyncCopyHandle.get` to block until completion
and retrieve the result, or :meth:`~AsyncCopyHandle.ready` to poll without blocking.

Raises:
RuntimeError: If the copy fails (propagated on :meth:`~AsyncCopyHandle.get`).

Examples:
Copy a nested structure of tensors to the GPU::

data = [torch.tensor([1, 2, 3]), torch.tensor([4, 5, 6])]
handle = start_copy(data, "cuda:0")
# ... do other work ...
result = handle.get() # [tensor([1,2,3], device='cuda:0'), ...]

Convert numpy arrays to CPU tensors (makes use of the fact that numpy arrays can be used as inputs and
benefits from background-thread scheduling)::

data = [np.array([1, 2, 3]), np.array([4, 5, 6])]
handle = start_copy(data, "cpu")
result = handle.get() # [tensor([1, 2, 3]), tensor([4, 5, 6])]
"""
dev = torch.device(device)
h = _ext.start_copy(
data,
str(dev),
bool(use_pinned_staging),
bool(use_background_thread),
bool(pack_cpu_tensors),
int(min_packed_alignment_bytes),
int(max_packed_chunk_bytes),
)
handle = AsyncCopyHandle(h)
return handle
Loading