Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,6 @@ cython_debug/
# pixi environments
.pixi/*
!.pixi/config.toml

# Cursor
.cursorrules
21 changes: 21 additions & 0 deletions cuda_core/cuda/core/experimental/_device.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,27 @@ class Device:
bus_id = handle_return(runtime.cudaDeviceGetPCIBusId(13, self._id))
return bus_id[:12].decode()

def can_access_peer(self, peer: Device | int) -> bool:
"""Check if this device can access memory from the specified peer device.

Queries whether peer-to-peer memory access is supported between this
device and the specified peer device.

Parameters
----------
peer : Device | int
The peer device to check accessibility to. Can be a Device object or device ID.
"""
peer = Device(peer)
cdef int d1 = <int> self.device_id
cdef int d2 = <int> peer.device_id
if d1 == d2:
return True
cdef int value = 0
with nogil:
HANDLE_RETURN(cydriver.cuDeviceCanAccessPeer(&value, d1, d2))
return bool(value)

@property
def uuid(self) -> str:
"""Return a UUID for the device.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ cdef class DeviceMemoryResource(MemoryResource):
bint _mempool_owned
IPCData _ipc_data
object _attributes
object _peer_accessible_by
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address peer access with IPC memory pools in a follow-up change. The peer access attributes are not inherited when an allocation is sent to another process via IPC, but access can be set. It will require a new test and possibly a small code change.

object __weakref__


cpdef DMR_mempool_get_access(DeviceMemoryResource, int)
110 changes: 110 additions & 0 deletions cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ from __future__ import annotations

from libc.limits cimport ULLONG_MAX
from libc.stdint cimport uintptr_t
from libc.stdlib cimport malloc, free
from libc.string cimport memset

from cuda.bindings cimport cydriver
Expand Down Expand Up @@ -222,6 +223,7 @@ cdef class DeviceMemoryResource(MemoryResource):
self._mempool_owned = False
self._ipc_data = None
self._attributes = None
self._peer_accessible_by = ()

def __init__(self, device_id: Device | int, options=None):
from .._device import Device
Expand Down Expand Up @@ -408,6 +410,73 @@ cdef class DeviceMemoryResource(MemoryResource):
"""
return getattr(self._ipc_data, 'uuid', None)

@property
def peer_accessible_by(self):
"""
Get or set the devices that can access allocations from this memory
pool. Access can be modified at any time and affects all allocations
from this memory pool.

Returns a tuple of sorted device IDs that currently have peer access to
allocations from this memory pool.

When setting, accepts a sequence of Device objects or device IDs.
Setting to an empty sequence revokes all peer access.

Examples
--------
>>> dmr = DeviceMemoryResource(0)
>>> dmr.peer_accessible_by = [1] # Grant access to device 1
>>> assert dmr.peer_accessible_by == (1,)
>>> dmr.peer_accessible_by = [] # Revoke access
"""
return self._peer_accessible_by

@peer_accessible_by.setter
def peer_accessible_by(self, devices):
"""Set which devices can access this memory pool."""
from .._device import Device

# Convert all devices to device IDs
cdef set[int] target_ids = {Device(dev).device_id for dev in devices}
target_ids.discard(self._dev_id) # exclude this device from peer access list
this_dev = Device(self._dev_id)
cdef list bad = [dev for dev in target_ids if not this_dev.can_access_peer(dev)]
if bad:
raise ValueError(f"Device {self._dev_id} cannot access peer(s): {', '.join(map(str, bad))}")
cdef set[int] cur_ids = set(self._peer_accessible_by)
cdef set[int] to_add = target_ids - cur_ids
cdef set[int] to_rm = cur_ids - target_ids
cdef size_t count = len(to_add) + len(to_rm) # transaction size
cdef cydriver.CUmemAccessDesc* access_desc = NULL
cdef size_t i = 0

if count > 0:
access_desc = <cydriver.CUmemAccessDesc*>malloc(count * sizeof(cydriver.CUmemAccessDesc))
if access_desc == NULL:
raise MemoryError("Failed to allocate memory for access descriptors")

try:
for dev_id in to_add:
access_desc[i].flags = cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE
access_desc[i].location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE
access_desc[i].location.id = dev_id
i += 1

for dev_id in to_rm:
access_desc[i].flags = cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_NONE
access_desc[i].location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE
access_desc[i].location.id = dev_id
i += 1

with nogil:
HANDLE_RETURN(cydriver.cuMemPoolSetAccess(self._handle, access_desc, count))
finally:
if access_desc != NULL:
free(access_desc)

self._peer_accessible_by = tuple(target_ids)


# DeviceMemoryResource Implementation
# -----------------------------------
Expand Down Expand Up @@ -515,6 +584,11 @@ cdef inline DMR_close(DeviceMemoryResource self):
if self._handle == NULL:
return

# This works around nvbug 5698116. When a memory pool handle is recycled
# the new handle inherits the peer access state of the previous handle.
if self._peer_accessible_by:
self.peer_accessible_by = []

try:
if self._mempool_owned:
with nogil:
Expand All @@ -525,3 +599,39 @@ cdef inline DMR_close(DeviceMemoryResource self):
self._attributes = None
self._mempool_owned = False
self._ipc_data = None
self._peer_accessible_by = ()


# Note: this is referenced in instructions to debug nvbug 5698116.
cpdef DMR_mempool_get_access(DeviceMemoryResource dmr, int device_id):
"""
Probes peer access from the given device using cuMemPoolGetAccess.

Parameters
----------
device_id : int or Device
The device to query access for.

Returns
-------
str
Access permissions: "rw" for read-write, "r" for read-only, "" for no access.
"""
from .._device import Device

cdef int dev_id = Device(device_id).device_id
cdef cydriver.CUmemAccess_flags flags
cdef cydriver.CUmemLocation location

location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE
location.id = dev_id

with nogil:
HANDLE_RETURN(cydriver.cuMemPoolGetAccess(&flags, dmr._handle, &location))

if flags == cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE:
return "rw"
elif flags == cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READ:
return "r"
else:
return ""
169 changes: 169 additions & 0 deletions cuda_core/tests/test_memory_peer_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import cuda.core.experimental
import pytest
from cuda.core.experimental import Device, DeviceMemoryResource
from cuda.core.experimental._utils.cuda_utils import CUDAError
from helpers.buffers import PatternGen, compare_buffer_to_constant, make_scratch_buffer

NBYTES = 1024


def _mempool_device_impl(num):
num_devices = len(cuda.core.experimental.system.devices)
if num_devices < num:
pytest.skip("Test requires at least {num} GPUs")

devs = [Device(i) for i in range(num)]
for i in reversed(range(num)):
devs[i].set_current()

if not all(devs[i].can_access_peer(j) for i in range(num) for j in range(num)):
pytest.skip("Test requires GPUs with peer access")

if not all(devs[i].properties.memory_pools_supported for i in range(num)):
pytest.skip("Device does not support mempool operations")

return devs


@pytest.fixture
def mempool_device_x2():
"""Fixture that provides two devices if available, otherwise skips test."""
return _mempool_device_impl(2)


@pytest.fixture
def mempool_device_x3():
"""Fixture that provides three devices if available, otherwise skips test."""
return _mempool_device_impl(3)


def test_peer_access_basic(mempool_device_x2):
"""Basic tests for dmr.peer_accessible_by."""
dev0, dev1 = mempool_device_x2
zero_on_dev0 = make_scratch_buffer(dev0, 0, NBYTES)
one_on_dev0 = make_scratch_buffer(dev0, 1, NBYTES)
stream_on_dev0 = dev0.create_stream()
dmr_on_dev1 = DeviceMemoryResource(dev1)
buf_on_dev1 = dmr_on_dev1.allocate(NBYTES)

# No access at first.
assert 0 not in dmr_on_dev1.peer_accessible_by
with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"):
one_on_dev0.copy_to(buf_on_dev1, stream=stream_on_dev0)

with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"):
zero_on_dev0.copy_from(buf_on_dev1, stream=stream_on_dev0)

# Allow access to device 1's allocations from device 0.
dmr_on_dev1.peer_accessible_by = [dev0]
assert 0 in dmr_on_dev1.peer_accessible_by
compare_buffer_to_constant(zero_on_dev0, 0)
one_on_dev0.copy_to(buf_on_dev1, stream=stream_on_dev0)
zero_on_dev0.copy_from(buf_on_dev1, stream=stream_on_dev0)
stream_on_dev0.sync()
compare_buffer_to_constant(zero_on_dev0, 1)

# Revoke access
dmr_on_dev1.peer_accessible_by = []
assert 0 not in dmr_on_dev1.peer_accessible_by
with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"):
one_on_dev0.copy_to(buf_on_dev1, stream=stream_on_dev0)

with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"):
zero_on_dev0.copy_from(buf_on_dev1, stream=stream_on_dev0)


def test_peer_access_property_x2(mempool_device_x2):
"""The the dmr.peer_accessible_by property (but not its functionality)."""
# The peer access list is a sorted tuple and always excludes the self
# device.
dev0, dev1 = mempool_device_x2
dmr = DeviceMemoryResource(dev0)

def check(expected):
assert isinstance(dmr.peer_accessible_by, tuple)
assert dmr.peer_accessible_by == expected

# No access to begin with.
check(expected=())
# fmt: off
dmr.peer_accessible_by = (0,) ; check(expected=()) # noqa: E702
dmr.peer_accessible_by = (1,) ; check(expected=(1,)) # noqa: E702
dmr.peer_accessible_by = (0, 1) ; check(expected=(1,)) # noqa: E702
dmr.peer_accessible_by = () ; check(expected=()) # noqa: E702
dmr.peer_accessible_by = [0, 1] ; check(expected=(1,)) # noqa: E702
dmr.peer_accessible_by = set() ; check(expected=()) # noqa: E702
dmr.peer_accessible_by = [1, 1, 1, 1, 1] ; check(expected=(1,)) # noqa: E702
# fmt: on

with pytest.raises(ValueError, match=r"device_id must be \>\= 0"):
dmr.peer_accessible_by = [-1] # device ID out of bounds

num_devices = len(cuda.core.experimental.system.devices)

with pytest.raises(ValueError, match=r"device_id must be within \[0, \d+\)"):
dmr.peer_accessible_by = [num_devices] # device ID out of bounds


def test_peer_access_transitions(mempool_device_x3):
"""Advanced tests for dmr.peer_accessible_by."""

# Check all transitions between peer access states. The implementation
# performs transactions that add or remove access as needed. This test
# ensures that that is working as expected.

# Doing everything from the point-of-view of device 0, there are four
# access states:
#
# [(), (1,), (2,), (1, 2)]
#
# and 4^2-4 = 12 non-identity transitions.

devs = mempool_device_x3 # Three devices

# Allocate per-device resources.
streams = [dev.create_stream() for dev in devs]
pgens = [PatternGen(devs[i], NBYTES, streams[i]) for i in range(3)]
dmrs = [DeviceMemoryResource(dev) for dev in devs]
bufs = [dmr.allocate(NBYTES) for dmr in dmrs]

def verify_state(state, pattern_seed):
"""
Verify an access state from the POV of device 0. E.g., (1,) means
device 1 has access but device 2 does not.
"""
# Populate device 0's buffer with a new pattern.
devs[0].set_current()
pgens[0].fill_buffer(bufs[0], seed=pattern_seed)
streams[0].sync()

for peer in [1, 2]:
devs[peer].set_current()
if peer in state:
# Peer device has access to 0's allocation
bufs[peer].copy_from(bufs[0], stream=streams[peer])
# Check the result on the peer device.
pgens[peer].verify_buffer(bufs[peer], seed=pattern_seed)
else:
# Peer device has no access to 0's allocation
with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"):
bufs[peer].copy_from(bufs[0], stream=streams[peer])

# For each transition, set the access state before and after, checking for
# the expected peer access capabilities at each stop.
pattern_seed = 0
states = [(), (1,), (2,), (1, 2)]
transitions = [(s0, s1) for s0 in states for s1 in states if s0 != s1]
for init_state, final_state in transitions:
dmrs[0].peer_accessible_by = init_state
assert dmrs[0].peer_accessible_by == init_state
verify_state(init_state, pattern_seed)
pattern_seed += 1

dmrs[0].peer_accessible_by = final_state
assert dmrs[0].peer_accessible_by == final_state
verify_state(final_state, pattern_seed)
pattern_seed += 1
Loading