Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
55ff031
Added code for closeness_centrality
SKADE2303 Mar 24, 2025
3bae54a
Added code for degree_centrality
SKADE2303 Mar 24, 2025
1f812a9
Added tests for Closeness_centrality.py
SKADE2303 Mar 24, 2025
8a7137a
Updated betweenness.py to support empty chunks and empty results
SKADE2303 Mar 24, 2025
12bc19d
Added tests for degree_centrality.py
SKADE2303 Mar 24, 2025
18d030c
Refactor imports to unify all centrality functions under the module n…
SKADE2303 Mar 24, 2025
b575480
Added all centrality functions to the all list for proper export
SKADE2303 Mar 24, 2025
1ec1bb4
Fixed closeness script to get rid of errors
SKADE2303 Mar 24, 2025
b92c3a0
Created heatmap for closeness_centrality
SKADE2303 Mar 24, 2025
04057cb
Added heatmap for degree_centrality
SKADE2303 Mar 24, 2025
9646cbf
Updated .pre-commit-config.yaml
SKADE2303 Mar 24, 2025
72caca4
Fix linting issues with ruff
SKADE2303 Mar 24, 2025
faed653
Removed implementation of closeness_centrality from this PR
SKADE2303 Mar 25, 2025
b61396e
Reverted betweenness.py to original state and removed fixed of handli…
SKADE2303 Mar 25, 2025
7b71fa7
Changed interface and __init__.py to remove centrality features
SKADE2303 Mar 25, 2025
0312138
Updated timing directory and removed closesness_centrality
SKADE2303 Mar 25, 2025
4f83046
Added Benchmark
SKADE2303 Mar 25, 2025
8661a19
Reverted .yaml file
SKADE2303 Mar 25, 2025
13a6c12
Updated README.md
SKADE2303 Mar 25, 2025
4a47ec7
Fixed pre-commit issues
SKADE2303 Mar 25, 2025
bf21d0c
Merge remote-tracking branch 'upstream/main' into issue-82-centrality
SKADE2303 Mar 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ repos:
rev: v0.6.7
hooks:
- id: ruff
args:
- --fix
args: ['--fix']
- id: ruff-format
- repo: local
hooks:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ nx-parallel is a NetworkX backend that uses joblib for parallelization. This pro
- [approximate_all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L13)
- [betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20)
- [closeness_vitality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L10)
- [degree_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/degree.py#L9)
- [edge_betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96)
- [is_reachable](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L13)
- [johnson](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L256)
Expand Down
7 changes: 7 additions & 0 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
},
},
"degree_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/degree.py#L8",
"additional_docs": "Parallel computation of degree centrality. Divides nodes into chunks and computes degree centrality for each chunk concurrently.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks."
},
},
"edge_betweenness_centrality": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L99",
"additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.",
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
"results_dir": "results",
"html_dir": "html",
"build_cache_size": 8,
"default_benchmark_timeout": 1200,
"default_benchmark_timeout": 1200
}
9 changes: 9 additions & 0 deletions benchmarks/benchmarks/bench_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,12 @@ def time_betweenness_centrality(self, backend, num_nodes, edge_prob):
def time_edge_betweenness_centrality(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = nx.edge_betweenness_centrality(G, backend=backend)


class Degree(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_degree_centrality(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
_ = nx.degree_centrality(G, backend=backend)
9 changes: 8 additions & 1 deletion nx_parallel/algorithms/centrality/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
from .betweenness import *
from .degree import degree_centrality
from .betweenness import betweenness_centrality, edge_betweenness_centrality

__all__ = [
"degree_centrality",
"betweenness_centrality",
"edge_betweenness_centrality",
]
63 changes: 63 additions & 0 deletions nx_parallel/algorithms/centrality/degree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = ["degree_centrality"]


@nxp._configure_if_nx_active()
def degree_centrality(G, get_chunks="chunks"):
"""
Parallel computation of degree centrality. Divides nodes into chunks
and computes degree centrality for each chunk concurrently.

networkx.degree_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.degree_centrality.html

Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes as input and returns an
iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

if len(G) == 0: # Handle empty graph
return {}

nodes = list(G.nodes)
n_jobs = nxp.get_n_jobs()

# Create node subsets
if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes)
else:
node_chunks = get_chunks(nodes)

if not node_chunks: # Handle empty chunks
return {}

# Compute degree centrality for each chunk in parallel
dc_subs = Parallel()(
delayed(_degree_centrality_node_subset)(G, chunk) for chunk in node_chunks
)

# Combine partial results
degree_centrality_dict = dc_subs[0]
for dc in dc_subs[1:]:
degree_centrality_dict.update(dc)

return degree_centrality_dict


def _degree_centrality_node_subset(G, nodes):
part_dc = {}
n = len(G)
if n == 1: # Handle single-node graph
for node in nodes:
part_dc[node] = 1.0
return part_dc

for node in nodes:
part_dc[node] = G.degree[node] / (n - 1)
return part_dc
138 changes: 138 additions & 0 deletions nx_parallel/algorithms/centrality/tests/test_degree_centrality.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import networkx as nx
import nx_parallel as nxp
import math


def test_degree_centrality_default_chunks():
"""Test degree centrality with default chunking."""
G = nx.erdos_renyi_graph(100, 0.1, seed=42) # Random graph with 100 nodes
H = nxp.ParallelGraph(G)

# Compute degree centrality using the parallel implementation
par_dc = nxp.degree_centrality(H)

# Compute degree centrality using NetworkX's built-in function
expected_dc = nx.degree_centrality(G)

# Compare the results
for node in G.nodes:
assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16)


def test_degree_centrality_custom_chunks():
"""Test degree centrality with custom chunking."""

def get_chunk(nodes):
num_chunks = nxp.get_n_jobs()
chunks = [[] for _ in range(num_chunks)]
for i, node in enumerate(nodes):
chunks[i % num_chunks].append(node)
return chunks

G = nx.erdos_renyi_graph(100, 0.1, seed=42)
H = nxp.ParallelGraph(G)

# Compute degree centrality using custom chunking
par_dc_chunk = nxp.degree_centrality(H, get_chunks=get_chunk)

# Compute degree centrality using NetworkX's built-in function
expected_dc = nx.degree_centrality(G)

# Compare the results
for node in G.nodes:
assert math.isclose(par_dc_chunk[node], expected_dc[node], abs_tol=1e-16)


def test_degree_centrality_empty_graph():
"""Test degree centrality on an empty graph."""
G = nx.Graph() # Empty graph
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

assert par_dc == expected_dc # Both should return an empty dictionary


def test_degree_centrality_single_node():
"""Test degree centrality on a graph with a single node."""
G = nx.Graph()
G.add_node(1)
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

assert par_dc == expected_dc # Both should return {1: 0.0}


def test_degree_centrality_disconnected_graph():
"""Test degree centrality on a disconnected graph."""
G = nx.Graph()
G.add_nodes_from([1, 2, 3]) # Add three disconnected nodes
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

assert par_dc == expected_dc # Both should return {1: 0.0, 2: 0.0, 3: 0.0}


def test_degree_centrality_self_loops():
"""Test degree centrality on a graph with self-loops."""
G = nx.Graph()
G.add_edges_from([(1, 1), (2, 2), (2, 3)]) # Add self-loops and one normal edge
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

for node in G.nodes:
assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16)


def test_degree_centrality_directed_graph():
"""Test degree centrality on a directed graph."""
G = nx.DiGraph()
G.add_edges_from([(1, 2), (2, 3), (3, 1)]) # Create a directed cycle
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

for node in G.nodes:
assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16)


def test_degree_centrality_multigraph():
"""Test degree centrality on a multigraph."""
G = nx.MultiGraph()
G.add_edges_from([(1, 2), (1, 2), (2, 3)]) # Add multiple edges between nodes
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

for node in G.nodes:
assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16)


def test_degree_centrality_large_graph():
"""Test degree centrality on a large graph."""
G = nx.fast_gnp_random_graph(1000, 0.01, seed=42)
H = nxp.ParallelGraph(G)

# Compute degree centrality
par_dc = nxp.degree_centrality(H)
expected_dc = nx.degree_centrality(G)

for node in G.nodes:
assert math.isclose(
par_dc[node], expected_dc[node], abs_tol=1e-6
) # Larger tolerance for large graphs
1 change: 1 addition & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# Centrality
"betweenness_centrality",
"edge_betweenness_centrality",
"degree_centrality",
# Efficiency
"local_efficiency",
# Shortest Paths : generic
Expand Down
Binary file added timing/heatmap_degree_centrality_timing.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
63 changes: 45 additions & 18 deletions timing/timing_all_functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import os

import networkx as nx
import pandas as pd
Expand All @@ -9,8 +10,13 @@

# Code to create README heatmap for all functions in function_list
heatmapDF = pd.DataFrame()
function_list = [nx.betweenness_centrality, nx.closeness_vitality, nx.local_efficiency]
number_of_nodes_list = [10, 20, 50, 300, 600]
function_list = [
nx.betweenness_centrality,
nx.closeness_vitality,
nx.closeness_centrality,
nx.degree_centrality,
]
number_of_nodes_list = [10, 20, 50, 150, 250]

for i in range(0, len(function_list)):
currFun = function_list[i]
Expand All @@ -23,46 +29,64 @@

# time both versions and update heatmapDF
t1 = time.time()
c = currFun(H)
if currFun == nx_parallel.closeness_centrality:
# Explicitly pass get_chunks="chunks" for the parallel version
c = currFun(H, get_chunks="chunks")
else:
c = currFun(H)
t2 = time.time()
parallelTime = t2 - t1

t1 = time.time()
c = currFun(G)
if currFun == nx_parallel.closeness_centrality:
# Explicitly pass get_chunks="chunks" for the parallel version
c = currFun(G, get_chunks="chunks")
else:
c = currFun(G)
t2 = time.time()
stdTime = t2 - t1

timesFaster = stdTime / parallelTime
heatmapDF.at[j, i] = timesFaster
print("Finished " + str(currFun))

# Code to create for row of heatmap specifically for tournaments
# Code to handle nx.tournament.is_reachable separately
for j in range(0, len(number_of_nodes_list)):
num = number_of_nodes_list[j]
G = nx.tournament.random_tournament(num)
H = nx_parallel.ParallelDiGraph(G)
H = nx_parallel.ParallelGraph(G)
t1 = time.time()
c = nx.tournament.is_reachable(H, 1, num)
c = nx.tournament.is_reachable(
H, 0, num - 1
) # Provide source (0) and target (num - 1)
t2 = time.time()
parallelTime = t2 - t1
t1 = time.time()
c = nx.tournament.is_reachable(G, 1, num)
c = nx.tournament.is_reachable(
G, 0, num - 1
) # Provide source (0) and target (num - 1)
t2 = time.time()
stdTime = t2 - t1
timesFaster = stdTime / parallelTime
heatmapDF.at[j, 3] = timesFaster
heatmapDF.at[j, len(function_list)] = (
timesFaster # Add this as a new row in the heatmap
)
print("Finished nx.tournament.is_reachable")

# plotting the heatmap with numbers and a green color scheme
plt.figure(figsize=(20, 4))
hm = sns.heatmap(data=heatmapDF.T, annot=True, cmap="Greens", cbar=True)

# Remove the tick labels on both axes
hm.set_yticklabels(
[
"betweenness_centrality",
"closeness_vitality",
"local_efficiency",
"tournament is_reachable",
]
)
# Dynamically set y-axis labels based on the number of rows in heatmapDF
labels = [
"betweenness_centrality",
"closeness_vitality",
"degree_centrality",
"tournament is_reachable",
]

# Ensure the number of labels matches the number of rows in heatmapDF
hm.set_yticklabels(labels[: len(heatmapDF.columns)])

# Adding x-axis labels
hm.set_xticklabels(number_of_nodes_list)
Expand All @@ -76,3 +100,6 @@

# displaying the plotted heatmap
plt.tight_layout()

os.makedirs("timing", exist_ok=True)
plt.savefig("timing/" + "heatmap_all_functions_timing.png")
3 changes: 3 additions & 0 deletions timing/timing_comparison.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ betweenness_centrality
closeness_vitality
![alt text](heatmap_closeness_vitality_timing.png)

degree_centrality
![alt text](heatmap_degree_centrality_timing.png)

local_efficiency
![alt text](heatmap_local_efficiency_timing.png)

Expand Down
Loading
Loading