Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions python_test/test_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import List

from ldpc.codes import rep_code, ring_code, hamming_code
from ldpc.codes.generate_ldpc_peg import generate_ldpc_peg


@pytest.mark.parametrize(
Expand Down Expand Up @@ -82,6 +83,44 @@ def test_hamming_code_rank_value():
with pytest.raises(ValueError):
hamming_code(-1)

@pytest.mark.parametrize("m,n,dv,dc", [(10, 20, 3, 6),])
# Test that generate_ldpc_peg returns a CSR matrix of correct shape
# and type.
def test_generate_ldpc_peg_output_type_and_shape(m: int, n: int, dv: int, dc: int):
H = generate_ldpc_peg(m, n, dv, dc)
assert isinstance(H, sp.csr_matrix)
assert H.shape == (m, n)

@pytest.mark.parametrize("m,n,dv,dc", [(12, 24, 3, 6),])

# Test that all entries of the generated matrix are binary (0 or 1).
def test_generate_ldpc_peg_binary_entries(m: int, n: int, dv: int, dc: int):
H_arr = generate_ldpc_peg(m, n, dv, dc).toarray()
assert set(np.unique(H_arr)).issubset({0, 1})

@pytest.mark.parametrize("m,n,dv,dc", [(15, 30, 2, 4),])

# Test that each variable node has exactly dv edges and each check node
# has degree at most dc.
def test_generate_ldpc_peg_degrees(m: int, n: int, dv: int, dc: int):
H = generate_ldpc_peg(m, n, dv, dc)
col_sums = np.array(H.sum(axis=0)).flatten()
np.testing.assert_array_equal(col_sums, np.full(n, dv))
row_sums = np.array(H.sum(axis=1)).flatten()
assert np.all(row_sums <= dc)

@pytest.mark.parametrize("m,n,dv,dc", [(20, 40, 2, 6),])

# Test that the generated matrix contains no 4-cycles, i.e.
# any two variable nodes share at most one common check neighbor.
def test_generate_ldpc_peg_no_four_cycles(m: int, n: int, dv: int, dc: int):
H = generate_ldpc_peg(m, n, dv, dc)
overlap = (H.T @ H).toarray()
diag = np.diag(overlap)
np.testing.assert_array_equal(diag, np.full(n, dv))
off_diag = overlap - np.diag(diag)
assert np.all(off_diag <= 1)


if __name__ == "__main__":
pytest.main([__file__])
97 changes: 97 additions & 0 deletions src_python/ldpc/codes/generate_ldpc_peg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import numpy as np
import scipy.sparse
from collections import deque

def generate_ldpc_peg(m: int, n: int, dv: int, dc: int) -> scipy.sparse.csr_matrix:
"""
Generate an (m x n) LDPC parity-check matrix using the Progressive Edge-Growth (PEG) algorithm.

Parameters:
m (int): Number of check nodes (rows).
n (int): Number of variable nodes (columns).
dv (int): Degree of each variable node (number of edges per variable).
dc (int): Maximum degree of each check node (capacity per check).

Returns:
scipy.sparse.csr_matrix: The generated LDPC parity-check matrix in CSR format.

The PEG algorithm incrementally builds a Tanner graph by adding edges one at a time
to maximize the girth (length of shortest cycle) of the bipartite graph. Each variable
node connects to dv checks, choosing the "farthest" available check to avoid short cycles.
"""
# Ensure total capacity is sufficient
if n * dv > m * dc:
raise ValueError(f"Insufficient capacity: n*dv ({n*dv}) > m*dc ({m*dc})")

# Initialize the adjacency matrix and degree trackers
H = np.zeros((m, n), dtype=np.int8) # H[c, v] = 1 if check c connects to variable v
deg_v = np.zeros(n, dtype=int) # degree of each variable node
deg_c = np.zeros(m, dtype=int) # degree of each check node

# Adjacency lists for BFS
var_to_checks = [[] for _ in range(n)] # checks connected to each variable
check_to_vars = [[] for _ in range(m)] # variables connected to each check

def bfs_distances(start_v: int) -> np.ndarray:
"""
Compute shortest-path distances from variable node start_v to all check nodes
in the current partial graph using breadth-first search.
Unconnected checks remain at distance -1.
"""
dist_c = -np.ones(m, dtype=int)
visited_vars = [False] * n
queue = deque()

# Mark start variable and enqueue its direct check neighbors
visited_vars[start_v] = True
for c in var_to_checks[start_v]:
dist_c[c] = 1
queue.append(('c', c))

while queue:
kind, idx = queue.popleft()
if kind == 'c':
# From a check node, go to connected variables
for vv in check_to_vars[idx]:
if not visited_vars[vv]:
visited_vars[vv] = True
# Then from each variable, go to its checks
for cc in var_to_checks[vv]:
if dist_c[cc] == -1:
dist_c[cc] = dist_c[idx] + 2
queue.append(('c', cc))
return dist_c

# Main PEG loop: add dv edges per variable node
for v in range(n):
for _ in range(dv):
# Compute distances to all checks
dists = bfs_distances(v)
INF = m + n # treat unreachable as infinite
effective = np.where(dists >= 0, dists, INF)

# Choose check nodes at maximum distance
max_dist = effective.max()
candidates = [c for c, d in enumerate(effective) if d == max_dist]

# Exclude already connected or full checks
eligible = [c for c in candidates if deg_c[c] < dc and c not in var_to_checks[v]]
# Fallback: allow any check with capacity
if not eligible:
eligible = [c for c in range(m) if deg_c[c] < dc]
# Fallback: allow all checks to prevent emptiness
if not eligible:
eligible = list(range(m))

# From eligible, pick the least-used
best_c = min(eligible, key=lambda c: deg_c[c])

# Connect v to best_c
H[best_c, v] = 1
deg_v[v] += 1
deg_c[best_c] += 1
var_to_checks[v].append(best_c)
check_to_vars[best_c].append(v)

# Return sparse parity-check matrix
return scipy.sparse.csr_matrix(H)