Python : 3.12.11 | packaged by conda-forge | (main, Jun 4 2025, 14:45:31) [GCC 13.3.0]
Platform : Linux-5.4.0-200-generic-x86_64-with-glibc2.31
Legion : 25.3.0 (commit: 04b7d5068c5e75f29684703e8a1b8568b3e59b9a)
Legate : 25.03.02
cuPynumeric : 25.03.02
Numpy : 1.26.4
Scipy : 1.16.0
Numba : (failed to detect)
CTK package : (failed to detect)
GPU driver : 575.57.08
GPU devices :
GPU 0: Tesla P100-SXM2-16GB
GPU 1: Tesla P100-SXM2-16GB
GPU 2: Tesla P100-SXM2-16GB
GPU 3: Tesla P100-SXM2-16GB
I expected to execute a 1 x N matrix multiplication.
import warnings
import cupynumeric
import numpy
import scipy # type: ignore
from legate.core import (
ImageComputationHint,
Scalar,
Shape,
align,
broadcast,
image,
types,
)
from legate_sparse.base import (
CompressedBase,
DenseSparseBase,
pack_to_rect1_store,
unpack_rect1_store,
)
from legate_sparse.config import SparseOpCode, rect1
from legate_sparse.coverage import clone_scipy_arr_kind
from legate_sparse.runtime import runtime
from legate_sparse.settings import settings
from legate_sparse.types import coord_ty, nnz_ty
from legate_sparse.csr import csr_array
from legate_sparse.utils import (
SUPPORTED_DATATYPES,
array_from_store_or_array,
cast_arr,
cast_to_common_type,
cast_to_store,
copy_store,
find_last_user_stacklevel,
get_storage_type,
get_store_from_cupynumeric_array,
is_dtype_supported,
is_scalar_like,
sort_by_rows_then_cols,
store_from_store_or_array,
store_to_cupynumeric_array,
)
def my_gemm(A, B, image_hint = False):
"""
Perform sparse matrix multiplication C = A @ B
Parameters:
-----------
A: csr_array
Input sparse matrix A
B: csr_array
Input sparse matrix B
Returns:
--------
csr_array
The result of the sparse matrix multiplication
"""
# Due to limitations in cuSPARSE, we cannot use a uniform task
# implementation for CSRxCSRxCSR SpGEMM across CPUs, OMPs and GPUs.
# The GPU implementation will create a set of local CSR matrices
# that will be aggregated into a global CSR.
if runtime.num_gpus > 0:
# replacement for the ImagePartition functor to get dense image
# for rows of B, run separate task for this
pos_rect = runtime.create_store(rect1, shape=(A.shape[0],)) # type: ignore
task = runtime.create_auto_task(SparseOpCode.FAST_IMAGE_RANGE)
A_pos_part = task.add_input(A.pos)
A_crd_part = task.add_input(A.crd)
B_pos_image_part = task.add_output(pos_rect)
task.add_constraint(align(A_pos_part, B_pos_image_part))
task.add_constraint(
image(A_pos_part, A_crd_part, hint=ImageComputationHint.MIN_MAX)
)
task.execute()
pos = runtime.create_store(rect1, shape=(A.shape[0],)) # type: ignore
crd = runtime.create_store(coord_ty, ndim=1)
vals = runtime.create_store(A.dtype, ndim=1)
task = runtime.create_auto_task(SparseOpCode.SPGEMM_CSR_CSR_CSR_GPU)
C_pos_part = task.add_output(pos)
C_crd_part = task.add_output(crd)
C_vals_part = task.add_output(vals)
A_pos_part = task.add_input(A.pos)
A_crd_part = task.add_input(A.crd)
A_vals_part = task.add_input(A.vals)
B_pos_part = task.add_input(B.pos)
B_crd_part = task.add_input(B.crd)
B_vals_part = task.add_input(B.vals)
B_pos_image_part = task.add_input(pos_rect)
# for inter-partition reduction and scans
# Add communicator even for 1 proc, because we expect it in the task
task.add_communicator("nccl")
# Constraints
# By-row split - same way for A and C
task.add_constraint(align(A_pos_part, C_pos_part))
task.add_constraint(
image(A_pos_part, A_crd_part, hint=ImageComputationHint.MIN_MAX)
)
task.add_constraint(
image(A_pos_part, A_vals_part, hint=ImageComputationHint.MIN_MAX)
)
# No partition for unbound stores
# task.add_constraint(image(C_pos_part_out, C_crd_part))
# task.add_constraint(image(C_pos_part_out, C_vals_part))
# For B just taking an image (currently - exact) for the column indices of A partition
# task.add_constraint(image(A_crd_part, B_pos_part))
# TODO (marsaev): we replaced custom image functor with separate task.
# Array class should provide this functionality
task.add_constraint(align(A_pos_part, B_pos_image_part))
task.add_constraint(
image(B_pos_image_part, B_pos_part, hint=ImageComputationHint.MIN_MAX)
)
task.add_constraint(
image(B_pos_part, B_crd_part, hint=ImageComputationHint.MIN_MAX)
)
task.add_constraint(
image(B_pos_part, B_vals_part, hint=ImageComputationHint.MIN_MAX)
)
# num columns in output
task.add_scalar_arg(B.shape[1], types.uint64)
# folded dimension
task.add_scalar_arg(B.shape[0], types.uint64)
# 1 if we want to try faster algorithm but that
# might need more available eager GPU scratch space
# TODO (marsaev): it might make sense to add this as parameter to dot()
task.add_scalar_arg(1 if settings.fast_spgemm() else 0, types.uint64)
task.execute()
# we can keep new stores in the new csr_array
return csr_array(
(vals, crd, pos),
shape=(A.shape[0], B.shape[1]),
dtype=A.dtype,
copy=False,
)
elif not image_hint:
# Create the query result.
q_nnz = runtime.create_store(nnz_ty, shape=(A.shape[0],))
task = runtime.create_auto_task(SparseOpCode.SPGEMM_CSR_CSR_CSR_NNZ)
nnz_per_row_part = task.add_output(q_nnz)
A_pos_part = task.add_input(A.pos)
A_crd_part = task.add_input(A.crd)
B_pos_part = task.add_input(B.pos)
B_crd_part = task.add_input(B.crd)
task.add_constraint(align(A_pos_part, nnz_per_row_part))
task.add_constraint(image(A_pos_part, A_crd_part))
# We'll only ask for the rows used by each partition by
# following an image of pos through crd. We'll then use that
# partition to declare the pieces of crd and vals of other that
# are needed by the matmul. The resulting image of coordinates
# into rows of other is not necessarily complete or disjoint.
task.add_constraint(image(A_crd_part, B_pos_part))
# Since the target partition of pos is likely not contiguous,
# we can't use the CompressedImagePartition functor and have to
# fall back to a standard functor. Since the source partition
# of the rows is not complete or disjoint, the images into crd
# and vals are not disjoint either.
task.add_constraint(image(B_pos_part, B_crd_part))
task.execute()
pos, nnz = CompressedBase.nnz_to_pos_cls(q_nnz)
# Block and convert the nnz future into an int.
nnz = int(nnz)
crd = runtime.create_store(coord_ty, shape=(nnz,))
vals = runtime.create_store(A.dtype, shape=(nnz,))
task = runtime.create_auto_task(SparseOpCode.SPGEMM_CSR_CSR_CSR)
C_pos_part_out = task.add_output(pos)
C_crd_part = task.add_output(crd)
C_vals_part = task.add_output(vals)
A_pos_part = task.add_input(A.pos)
A_crd_part = task.add_input(A.crd)
A_vals_part = task.add_input(A.vals)
B_pos_part = task.add_input(B.pos)
B_crd_part = task.add_input(B.crd)
B_vals_part = task.add_input(B.vals)
# Add pos to the inputs as well so that we get READ_WRITE
# privileges.
C_pos_part_in = task.add_input(pos)
task.add_constraint(align(A_pos_part, C_pos_part_in))
# Constraints
# By-row split - same way for A and C
task.add_constraint(align(A_pos_part, C_pos_part_out))
task.add_constraint(image(A_pos_part, A_crd_part))
task.add_constraint(image(A_pos_part, A_vals_part))
task.add_constraint(image(C_pos_part_out, C_crd_part))
task.add_constraint(image(C_pos_part_out, C_vals_part))
# For B just taking an image (currently - exact) for the column indices of A partition
task.add_constraint(image(A_crd_part, B_pos_part))
task.add_constraint(image(B_pos_part, B_crd_part))
task.add_constraint(image(B_pos_part, B_vals_part))
task.execute()
return csr_array(
(vals, crd, pos),
shape=Shape((A.shape[0], B.shape[1])),
)
else:
# Create the query result.
q_nnz = runtime.create_store(nnz_ty, shape=(A.shape[0],))
task = runtime.create_auto_task(SparseOpCode.SPGEMM_CSR_CSR_CSR_NNZ)
nnz_per_row_part = task.add_output(q_nnz)
A_pos_part = task.add_input(A.pos)
A_crd_part = task.add_input(A.crd)
B_pos_part = task.add_input(B.pos)
B_crd_part = task.add_input(B.crd)
task.add_constraint(align(A_pos_part, nnz_per_row_part))
task.add_constraint(image(A_pos_part, A_crd_part, hint=ImageComputationHint.MIN_MAX))
# We'll only ask for the rows used by each partition by
# following an image of pos through crd. We'll then use that
# partition to declare the pieces of crd and vals of other that
# are needed by the matmul. The resulting image of coordinates
# into rows of other is not necessarily complete or disjoint.
task.add_constraint(image(A_crd_part, B_pos_part, hint=ImageComputationHint.MIN_MAX))
# Since the target partition of pos is likely not contiguous,
# we can't use the CompressedImagePartition functor and have to
# fall back to a standard functor. Since the source partition
# of the rows is not complete or disjoint, the images into crd
# and vals are not disjoint either.
task.add_constraint(image(B_pos_part, B_crd_part, hint=ImageComputationHint.MIN_MAX))
task.execute()
pos, nnz = CompressedBase.nnz_to_pos_cls(q_nnz)
# Block and convert the nnz future into an int.
nnz = int(nnz)
crd = runtime.create_store(coord_ty, shape=(nnz,))
vals = runtime.create_store(A.dtype, shape=(nnz,))
task = runtime.create_auto_task(SparseOpCode.SPGEMM_CSR_CSR_CSR)
C_pos_part_out = task.add_output(pos)
C_crd_part = task.add_output(crd)
C_vals_part = task.add_output(vals)
A_pos_part = task.add_input(A.pos)
A_crd_part = task.add_input(A.crd)
A_vals_part = task.add_input(A.vals)
B_pos_part = task.add_input(B.pos)
B_crd_part = task.add_input(B.crd)
B_vals_part = task.add_input(B.vals)
# Add pos to the inputs as well so that we get READ_WRITE
# privileges.
C_pos_part_in = task.add_input(pos)
task.add_constraint(align(A_pos_part, C_pos_part_in))
# Constraints
# By-row split - same way for A and C
task.add_constraint(align(A_pos_part, C_pos_part_out))
task.add_constraint(image(A_pos_part, A_crd_part, hint=ImageComputationHint.MIN_MAX))
task.add_constraint(image(A_pos_part, A_vals_part, hint=ImageComputationHint.MIN_MAX))
task.add_constraint(image(C_pos_part_out, C_crd_part, hint=ImageComputationHint.MIN_MAX))
task.add_constraint(image(C_pos_part_out, C_vals_part, hint=ImageComputationHint.MIN_MAX))
# For B just taking an image (currently - exact) for the column indices of A partition
task.add_constraint(image(A_crd_part, B_pos_part, hint=ImageComputationHint.MIN_MAX))
task.add_constraint(image(B_pos_part, B_crd_part, hint=ImageComputationHint.MIN_MAX))
task.add_constraint(image(B_pos_part, B_vals_part, hint=ImageComputationHint.MIN_MAX))
task.execute()
return csr_array(
(vals, crd, pos),
shape=Shape((A.shape[0], B.shape[1])),
)
# Copyright 2022-2024 NVIDIA Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This example performs matrix power by repetitively multiplication. We assume
# that the matrix is square, so the number of cols is same as the number of
# rows in the matrix
import argparse
from functools import reduce
import numpy.typing as npt
from common import get_arg_number, parse_common_args
# global states random_seed, rng
global random_seed, rng
# ----------------------------
# Matrix generation functions
# ----------------------------
def create_csr_with_nnz_per_row(nrows, nnz_per_row: int, dtype: npt.DTypeLike = None):
"""Return a CSR matrix with a prescribed number of nonzeros in each row.
Args:
----
nrows: int
Number of rows in the matrix. Number of columns is same as number of rows
nnz_per_row: int
Desired number of nonzero entries in each row
dtype: npt.DTypeLike
Datatype of the values. This should be one of floating point datatypes
"""
dtype = np.float32 if dtype is None else dtype
ncols = nrows
nnz = nnz_per_row * nrows
indptr = np.linspace(
start=0, stop=nnz, num=nrows + 1, endpoint=True, dtype=np.int64
)
cols = rng.integers(0, ncols, nnz).reshape(ncols, nnz_per_row)
cols = np.sort(cols, axis=1).flatten()
vals = np.ones(nnz, dtype=dtype)
matrix = sparse.csr_matrix((vals, cols, indptr), shape=(nrows, ncols))
return matrix
def create_vec_with_nnz_total(ncols: int, nnz_total: int, dtype: npt.DTypeLike = None):
"""Return a (1 x ncols) sparse row vector with exactly `nnz_total` nonzero entries.
This shape works for B @ A when A is (ncols x ncols) and B is (1 x ncols).
"""
dtype = np.float32 if dtype is None else dtype
if nnz_total > ncols:
raise ValueError("nnz_total cannot exceed ncols without duplicates.")
cols = np.random.choice(ncols, size=nnz_total, replace=False)
rows = np.zeros(nnz_total, dtype=np.int64) # single row index
vals = np.ones(nnz_total, dtype=dtype)
return sparse.csr_matrix((vals, (rows, cols)), shape=(1, ncols))
def create_csr_with_nnz_total(nrows, nnz_total, dtype: npt.DTypeLike = None):
"""Return a CSR matrix with a prescribed number of nonzeros in the matrix.
Args:
----
nrows: int
Number of rows in the matrix. Number of columns is same as number of rows
nnz_total: int
Desired number of nonzero entries in the matrix with no expectation of
nonzeros in each row of the matrix
dtype: npt.DTypeLike
Datatype of the values. This should be one of floating point datatypes
"""
dtype = np.float32 if dtype is None else dtype
ncols = nrows
coo_rows = rng.integers(0, nrows, nnz_total)
coo_cols = rng.integers(0, ncols, nnz_total)
vals = np.ones(nnz_total, dtype=dtype)
matrix = sparse.csr_matrix((vals, (coo_rows, coo_cols)), shape=(nrows, ncols))
return matrix
def create_banded_with_nnz_per_row(nrows: int, nnz_per_row: int, dtype: npt.DTypeLike = None):
"""
Return an (nrows x nrows) CSR 'banded' matrix with ~nnz_per_row per row.
The band follows the main diagonal; a small amount of randomness perturbs
band position and swaps a few columns per row to add entropy.
Args
----
nrows: int
Matrix is square: (nrows x nrows).
nnz_per_row: int
Target number of nonzeros per row (capped at nrows).
dtype: npt.DTypeLike
Datatype of the values (defaults to np.float32).
"""
dtype = np.float32 if dtype is None else dtype
n = nrows
if nnz_per_row <= 0 or n == 0:
return sparse.csr_matrix((n, n), dtype=dtype)
k = min(nnz_per_row, n) # enforce bounds
# Split half-bandwidth to left/right of the diagonal
left = (k - 1) // 2
right = k - 1 - left
# Entropy controls
JITTER = 1 # shift band center by -1/0/+1 rows
SWAP_PROB = 0.05 # per-row probability to swap one in-band col with a random out-of-band col
indices = []
indptr = [0]
data = []
for i in range(n):
# Centered window around i, clipped to fit exactly k columns within [0, n-1]
start_nominal = i - left
# Clamp window so it stays length k inside [0, n-k]
window_start = max(0, min(start_nominal, n - k))
# Small random jitter of the band position
if JITTER > 0:
j = np.random.randint(-JITTER, JITTER + 1)
window_start = max(0, min(window_start + j, n - k))
base_cols = np.arange(window_start, window_start + k, dtype=np.int64)
# With small prob, swap one in-band col with a random out-of-band col (if available)
if np.random.random() < SWAP_PROB and k < n:
# pick an in-band index to replace
in_idx = np.random.randint(0, k)
# pick an out-of-band column not in base_cols
# try a few times to find something outside the set
chosen = base_cols[in_idx]
attempts = 0
while attempts < 5:
cand = np.random.randint(0, n)
if cand not in base_cols:
chosen = cand
break
attempts += 1
base_cols[in_idx] = chosen
base_cols.sort()
indices.extend(base_cols.tolist())
data.extend([1.0] * k)
indptr.append(indptr[-1] + k)
A = sparse.csr_matrix((np.asarray(data, dtype=dtype),
np.asarray(indices, dtype=np.int64),
np.asarray(indptr, dtype=np.int64)),
shape=(n, n))
return A
# ------------------------
# Matrix Multiply routines
# ------------------------
def compute_vec_multiply_ntimes(A, B, timer, nwarmups: int = 2, ntimes: int = 4, image_hint = False):
"""Multiply matrix by self ntimes and print the time elapsed.
Args:
----
A: csr_matrix
The input matrix
timer:
Instance of the timer class to measure elapsed time
ntimes:
Number of matrix multiplies or the exponent in A^n
nwarmups:
Number of warmup iterations before
"""
timer.start()
elapsed_time_init_copy = timer.stop()
for _ in range(nwarmups):
output = my_gemm(B, A, image_hint)
elapsed_time_spgemm = [-1.0] * ntimes
elapsed_time_copy = [-1.0] * ntimes
for hop in range(ntimes):
timer.start()
output = my_gemm(B, A, image_hint)
elapsed_time_spgemm[hop] = timer.stop()
timer.start()
B = output.copy()
elapsed_time_copy[hop] = timer.stop()
# TODO: Wrap all the timing information in a dataclass
nelems = reduce(lambda x, y: x * y, A.shape)
sparsity_output = (nelems - output.nnz) * 100.0 / (A.shape[0] ** 2)
print(f"Dimension of A : {A.shape}")
print(f"Output matrix shape : {output.shape}")
print(f"NNZ of A : {A.nnz}")
print(f"NNZ of output : {output.nnz}")
print(f"Sparsity of output (%) : {sparsity_output}")
print(f"Total number of hops : {ntimes}")
print(f"Elapsed time for copy in init (ms) : {elapsed_time_init_copy}")
for hop in range(ntimes):
print(
f"Elapsed time for spgemm for hop {hop} (ms) : {elapsed_time_spgemm[hop]}"
)
print(f"Elapsed time for copy for hop {hop} (ms) : {elapsed_time_copy[hop]}")
def compute_matrix_multiply_ntimes(A, timer, nwarmups: int = 2, ntimes: int = 4, image_hint = False):
"""Multiply matrix by self ntimes and print the time elapsed.
Args:
----
A: csr_matrix
The input matrix
timer:
Instance of the timer class to measure elapsed time
ntimes:
Number of matrix multiplies or the exponent in A^n
nwarmups:
Number of warmup iterations before
"""
timer.start()
B = A.copy()
elapsed_time_init_copy = timer.stop()
for _ in range(nwarmups):
output = my_gemm(A, B, image_hint)
elapsed_time_spgemm = [-1.0] * ntimes
elapsed_time_copy = [-1.0] * ntimes
for hop in range(ntimes):
timer.start()
output = my_gemm(A, B, image_hint)
elapsed_time_spgemm[hop] = timer.stop()
timer.start()
B = output.copy()
elapsed_time_copy[hop] = timer.stop()
# TODO: Wrap all the timing information in a dataclass
nelems = reduce(lambda x, y: x * y, A.shape)
sparsity_output = (nelems - output.nnz) * 100.0 / (A.shape[0] ** 2)
print(f"Dimension of A : {A.shape}")
print(f"Output matrix shape : {output.shape}")
print(f"NNZ of A : {A.nnz}")
print(f"NNZ of output : {output.nnz}")
print(f"Sparsity of output (%) : {sparsity_output}")
print(f"Total number of hops : {ntimes}")
print(f"Elapsed time for copy in init (ms) : {elapsed_time_init_copy}")
for hop in range(ntimes):
print(
f"Elapsed time for spgemm for hop {hop} (ms) : {elapsed_time_spgemm[hop]}"
)
print(f"Elapsed time for copy for hop {hop} (ms) : {elapsed_time_copy[hop]}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"-n",
"--nrows",
type=str,
default="1k",
dest="nrows",
help="Number of rows in the generated matrix (accepts suffixes 'k', 'm', 'g')",
)
parser.add_argument(
"--nnz-per-row",
type=int,
default=3,
dest="nnz_per_row",
help="Number of nnz per row for generated matrix",
)
parser.add_argument(
"--nnz-total",
type=str,
default="-1",
dest="nnz_total",
help="Total number of nonzeros for the generated matrix. "
"If both --nnz-per-row and --nnz-total are given, "
"--nnz-total takes precedence",
)
parser.add_argument(
"--ntimes",
type=int,
default=4,
dest="ntimes",
help="Number of times A @ A is performed",
)
parser.add_argument(
"--nwarmups",
type=int,
default=2,
dest="nwarmups",
help="Number of warmup iterations before A @ A is timed",
)
parser.add_argument(
"--same-sparsity-for-cpu-and-gpu",
action="store_true",
help="Use NumPy to generate random nos regardless of --package",
)
parser.add_argument(
"--hint",
type=int,
default=1,
help="Use NumPy to generate random nos regardless of --package",
)
parser.add_argument(
"--random-seed",
type=int,
default=42,
help="Random number seed that influences the sparsity pattern",
)
args, _ = parser.parse_known_args()
_, timer, np, sparse, linalg, use_legate = parse_common_args()
nrows = get_arg_number(args.nrows)
nnz_total = get_arg_number(args.nnz_total)
# this is a global variable
global random_seed
global rng
random_seed = args.random_seed
if args.same_sparsity_for_cpu_and_gpu:
message = (
"Using NumPy to generate random numbers and "
"ensure sparsity pattern is the same across NumPy and "
"cuPyNumeric"
)
print(message)
import numpy as numpy
rng = numpy.random.default_rng(random_seed)
else:
rng = np.random.default_rng(random_seed)
timer.start()
if nnz_total > 0:
A = create_csr_with_nnz_total(nrows, nnz_total, np.float32)
B = create_vec_with_nnz_total(nrows, nnz_total/nrows, np.float32)
print("Matrix created with total number of nonzeros")
elif nnz_total < 0 and args.nnz_per_row > 0:
A = create_banded_with_nnz_per_row(nrows, args.nnz_per_row, np.float32)
B = create_vec_with_nnz_total(nrows, args.nnz_per_row, np.float32)
print("Matrix created with number of nonzeros per row")
elapsed_time_matrix_gen = timer.stop()
print("IS THERE A HINT", args.hint)
compute_vec_multiply_ntimes(A, B, timer, int(args.nwarmups), int(args.ntimes), args.hint)
print(f"Elapsed time in matrix creation (ms) : {elapsed_time_matrix_gen}")
Software versions
Python : 3.12.11 | packaged by conda-forge | (main, Jun 4 2025, 14:45:31) [GCC 13.3.0]
Platform : Linux-5.4.0-200-generic-x86_64-with-glibc2.31
Legion : 25.3.0 (commit: 04b7d5068c5e75f29684703e8a1b8568b3e59b9a)
Legate : 25.03.02
cuPynumeric : 25.03.02
Numpy : 1.26.4
Scipy : 1.16.0
Numba : (failed to detect)
CTK package : (failed to detect)
GPU driver : 575.57.08
GPU devices :
GPU 0: Tesla P100-SXM2-16GB
GPU 1: Tesla P100-SXM2-16GB
GPU 2: Tesla P100-SXM2-16GB
GPU 3: Tesla P100-SXM2-16GB
Jupyter notebook / Jupyter Lab version
No response
Expected behavior
I expected to execute a 1 x N matrix multiplication.
Observed behavior
I got the following error message:
Example code or instructions
Stack traceback or browser console output
No response