diff --git a/python_test/test_codes.py b/python_test/test_codes.py index 7b3d50f..a11d64f 100644 --- a/python_test/test_codes.py +++ b/python_test/test_codes.py @@ -4,6 +4,7 @@ from typing import List from ldpc.codes import rep_code, ring_code, hamming_code +from ldpc.codes.generate_ldpc_peg import generate_ldpc_peg @pytest.mark.parametrize( @@ -82,6 +83,44 @@ def test_hamming_code_rank_value(): with pytest.raises(ValueError): hamming_code(-1) +@pytest.mark.parametrize("m,n,dv,dc", [(10, 20, 3, 6),]) +# Test that generate_ldpc_peg returns a CSR matrix of correct shape +# and type. +def test_generate_ldpc_peg_output_type_and_shape(m: int, n: int, dv: int, dc: int): + H = generate_ldpc_peg(m, n, dv, dc) + assert isinstance(H, sp.csr_matrix) + assert H.shape == (m, n) + +@pytest.mark.parametrize("m,n,dv,dc", [(12, 24, 3, 6),]) + +# Test that all entries of the generated matrix are binary (0 or 1). +def test_generate_ldpc_peg_binary_entries(m: int, n: int, dv: int, dc: int): + H_arr = generate_ldpc_peg(m, n, dv, dc).toarray() + assert set(np.unique(H_arr)).issubset({0, 1}) + +@pytest.mark.parametrize("m,n,dv,dc", [(15, 30, 2, 4),]) + +# Test that each variable node has exactly dv edges and each check node +# has degree at most dc. +def test_generate_ldpc_peg_degrees(m: int, n: int, dv: int, dc: int): + H = generate_ldpc_peg(m, n, dv, dc) + col_sums = np.array(H.sum(axis=0)).flatten() + np.testing.assert_array_equal(col_sums, np.full(n, dv)) + row_sums = np.array(H.sum(axis=1)).flatten() + assert np.all(row_sums <= dc) + +@pytest.mark.parametrize("m,n,dv,dc", [(20, 40, 2, 6),]) + +# Test that the generated matrix contains no 4-cycles, i.e. +# any two variable nodes share at most one common check neighbor. +def test_generate_ldpc_peg_no_four_cycles(m: int, n: int, dv: int, dc: int): + H = generate_ldpc_peg(m, n, dv, dc) + overlap = (H.T @ H).toarray() + diag = np.diag(overlap) + np.testing.assert_array_equal(diag, np.full(n, dv)) + off_diag = overlap - np.diag(diag) + assert np.all(off_diag <= 1) + if __name__ == "__main__": pytest.main([__file__]) diff --git a/src_python/ldpc/codes/generate_ldpc_peg.py b/src_python/ldpc/codes/generate_ldpc_peg.py new file mode 100644 index 0000000..5d9f1e6 --- /dev/null +++ b/src_python/ldpc/codes/generate_ldpc_peg.py @@ -0,0 +1,97 @@ +import numpy as np +import scipy.sparse +from collections import deque + +def generate_ldpc_peg(m: int, n: int, dv: int, dc: int) -> scipy.sparse.csr_matrix: + """ + Generate an (m x n) LDPC parity-check matrix using the Progressive Edge-Growth (PEG) algorithm. + + Parameters: + m (int): Number of check nodes (rows). + n (int): Number of variable nodes (columns). + dv (int): Degree of each variable node (number of edges per variable). + dc (int): Maximum degree of each check node (capacity per check). + + Returns: + scipy.sparse.csr_matrix: The generated LDPC parity-check matrix in CSR format. + + The PEG algorithm incrementally builds a Tanner graph by adding edges one at a time + to maximize the girth (length of shortest cycle) of the bipartite graph. Each variable + node connects to dv checks, choosing the "farthest" available check to avoid short cycles. + """ + # Ensure total capacity is sufficient + if n * dv > m * dc: + raise ValueError(f"Insufficient capacity: n*dv ({n*dv}) > m*dc ({m*dc})") + + # Initialize the adjacency matrix and degree trackers + H = np.zeros((m, n), dtype=np.int8) # H[c, v] = 1 if check c connects to variable v + deg_v = np.zeros(n, dtype=int) # degree of each variable node + deg_c = np.zeros(m, dtype=int) # degree of each check node + + # Adjacency lists for BFS + var_to_checks = [[] for _ in range(n)] # checks connected to each variable + check_to_vars = [[] for _ in range(m)] # variables connected to each check + + def bfs_distances(start_v: int) -> np.ndarray: + """ + Compute shortest-path distances from variable node start_v to all check nodes + in the current partial graph using breadth-first search. + Unconnected checks remain at distance -1. + """ + dist_c = -np.ones(m, dtype=int) + visited_vars = [False] * n + queue = deque() + + # Mark start variable and enqueue its direct check neighbors + visited_vars[start_v] = True + for c in var_to_checks[start_v]: + dist_c[c] = 1 + queue.append(('c', c)) + + while queue: + kind, idx = queue.popleft() + if kind == 'c': + # From a check node, go to connected variables + for vv in check_to_vars[idx]: + if not visited_vars[vv]: + visited_vars[vv] = True + # Then from each variable, go to its checks + for cc in var_to_checks[vv]: + if dist_c[cc] == -1: + dist_c[cc] = dist_c[idx] + 2 + queue.append(('c', cc)) + return dist_c + + # Main PEG loop: add dv edges per variable node + for v in range(n): + for _ in range(dv): + # Compute distances to all checks + dists = bfs_distances(v) + INF = m + n # treat unreachable as infinite + effective = np.where(dists >= 0, dists, INF) + + # Choose check nodes at maximum distance + max_dist = effective.max() + candidates = [c for c, d in enumerate(effective) if d == max_dist] + + # Exclude already connected or full checks + eligible = [c for c in candidates if deg_c[c] < dc and c not in var_to_checks[v]] + # Fallback: allow any check with capacity + if not eligible: + eligible = [c for c in range(m) if deg_c[c] < dc] + # Fallback: allow all checks to prevent emptiness + if not eligible: + eligible = list(range(m)) + + # From eligible, pick the least-used + best_c = min(eligible, key=lambda c: deg_c[c]) + + # Connect v to best_c + H[best_c, v] = 1 + deg_v[v] += 1 + deg_c[best_c] += 1 + var_to_checks[v].append(best_c) + check_to_vars[best_c].append(v) + + # Return sparse parity-check matrix + return scipy.sparse.csr_matrix(H) \ No newline at end of file