diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 22681389..d14eea98 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,8 +10,7 @@ repos: rev: v0.6.7 hooks: - id: ruff - args: - - --fix + args: ['--fix'] - id: ruff-format - repo: local hooks: diff --git a/README.md b/README.md index 9d54ad19..561935ee 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ nx-parallel is a NetworkX backend that uses joblib for parallelization. This pro - [approximate_all_pairs_node_connectivity](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/approximation/connectivity.py#L13) - [betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L20) - [closeness_vitality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/vitality.py#L10) +- [degree_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/degree.py#L9) - [edge_betweenness_centrality](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L96) - [is_reachable](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/tournament.py#L13) - [johnson](https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/shortest_paths/weighted.py#L256) diff --git a/_nx_parallel/__init__.py b/_nx_parallel/__init__.py index b4cd39a0..e0b85624 100644 --- a/_nx_parallel/__init__.py +++ b/_nx_parallel/__init__.py @@ -97,6 +97,13 @@ def get_info(): 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." }, }, + "degree_centrality": { + "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/degree.py#L8", + "additional_docs": "Parallel computation of degree centrality. Divides nodes into chunks and computes degree centrality for each chunk concurrently.", + "additional_parameters": { + 'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `nodes` into `n_jobs` number of chunks." + }, + }, "edge_betweenness_centrality": { "url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/centrality/betweenness.py#L99", "additional_docs": "The parallel computation is implemented by dividing the nodes into chunks and computing edge betweenness centrality for each chunk concurrently.", diff --git a/benchmarks/asv.conf.json b/benchmarks/asv.conf.json index a12982fb..b5b49eef 100644 --- a/benchmarks/asv.conf.json +++ b/benchmarks/asv.conf.json @@ -17,5 +17,5 @@ "results_dir": "results", "html_dir": "html", "build_cache_size": 8, - "default_benchmark_timeout": 1200, + "default_benchmark_timeout": 1200 } diff --git a/benchmarks/benchmarks/bench_centrality.py b/benchmarks/benchmarks/bench_centrality.py index 74cba68e..26b322a6 100644 --- a/benchmarks/benchmarks/bench_centrality.py +++ b/benchmarks/benchmarks/bench_centrality.py @@ -19,3 +19,12 @@ def time_betweenness_centrality(self, backend, num_nodes, edge_prob): def time_edge_betweenness_centrality(self, backend, num_nodes, edge_prob): G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True) _ = nx.edge_betweenness_centrality(G, backend=backend) + + +class Degree(Benchmark): + params = [(backends), (num_nodes), (edge_prob)] + param_names = ["backend", "num_nodes", "edge_prob"] + + def time_degree_centrality(self, backend, num_nodes, edge_prob): + G = get_cached_gnp_random_graph(num_nodes, edge_prob) + _ = nx.degree_centrality(G, backend=backend) diff --git a/nx_parallel/algorithms/centrality/__init__.py b/nx_parallel/algorithms/centrality/__init__.py index cf7adb68..0dade124 100644 --- a/nx_parallel/algorithms/centrality/__init__.py +++ b/nx_parallel/algorithms/centrality/__init__.py @@ -1 +1,8 @@ -from .betweenness import * +from .degree import degree_centrality +from .betweenness import betweenness_centrality, edge_betweenness_centrality + +__all__ = [ + "degree_centrality", + "betweenness_centrality", + "edge_betweenness_centrality", +] diff --git a/nx_parallel/algorithms/centrality/degree.py b/nx_parallel/algorithms/centrality/degree.py new file mode 100644 index 00000000..9c23ecea --- /dev/null +++ b/nx_parallel/algorithms/centrality/degree.py @@ -0,0 +1,63 @@ +from joblib import Parallel, delayed +import nx_parallel as nxp + +__all__ = ["degree_centrality"] + + +@nxp._configure_if_nx_active() +def degree_centrality(G, get_chunks="chunks"): + """ + Parallel computation of degree centrality. Divides nodes into chunks + and computes degree centrality for each chunk concurrently. + + networkx.degree_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.degree_centrality.html + + Parameters + ---------- + get_chunks : str, function (default = "chunks") + A function that takes in a list of all the nodes as input and returns an + iterable `node_chunks`. The default chunking is done by slicing the + `nodes` into `n_jobs` number of chunks. + """ + if hasattr(G, "graph_object"): + G = G.graph_object + + if len(G) == 0: # Handle empty graph + return {} + + nodes = list(G.nodes) + n_jobs = nxp.get_n_jobs() + + # Create node subsets + if get_chunks == "chunks": + node_chunks = nxp.create_iterables(G, "node", n_jobs, nodes) + else: + node_chunks = get_chunks(nodes) + + if not node_chunks: # Handle empty chunks + return {} + + # Compute degree centrality for each chunk in parallel + dc_subs = Parallel()( + delayed(_degree_centrality_node_subset)(G, chunk) for chunk in node_chunks + ) + + # Combine partial results + degree_centrality_dict = dc_subs[0] + for dc in dc_subs[1:]: + degree_centrality_dict.update(dc) + + return degree_centrality_dict + + +def _degree_centrality_node_subset(G, nodes): + part_dc = {} + n = len(G) + if n == 1: # Handle single-node graph + for node in nodes: + part_dc[node] = 1.0 + return part_dc + + for node in nodes: + part_dc[node] = G.degree[node] / (n - 1) + return part_dc diff --git a/nx_parallel/algorithms/centrality/tests/test_degree_centrality.py b/nx_parallel/algorithms/centrality/tests/test_degree_centrality.py new file mode 100644 index 00000000..911847f2 --- /dev/null +++ b/nx_parallel/algorithms/centrality/tests/test_degree_centrality.py @@ -0,0 +1,138 @@ +import networkx as nx +import nx_parallel as nxp +import math + + +def test_degree_centrality_default_chunks(): + """Test degree centrality with default chunking.""" + G = nx.erdos_renyi_graph(100, 0.1, seed=42) # Random graph with 100 nodes + H = nxp.ParallelGraph(G) + + # Compute degree centrality using the parallel implementation + par_dc = nxp.degree_centrality(H) + + # Compute degree centrality using NetworkX's built-in function + expected_dc = nx.degree_centrality(G) + + # Compare the results + for node in G.nodes: + assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16) + + +def test_degree_centrality_custom_chunks(): + """Test degree centrality with custom chunking.""" + + def get_chunk(nodes): + num_chunks = nxp.get_n_jobs() + chunks = [[] for _ in range(num_chunks)] + for i, node in enumerate(nodes): + chunks[i % num_chunks].append(node) + return chunks + + G = nx.erdos_renyi_graph(100, 0.1, seed=42) + H = nxp.ParallelGraph(G) + + # Compute degree centrality using custom chunking + par_dc_chunk = nxp.degree_centrality(H, get_chunks=get_chunk) + + # Compute degree centrality using NetworkX's built-in function + expected_dc = nx.degree_centrality(G) + + # Compare the results + for node in G.nodes: + assert math.isclose(par_dc_chunk[node], expected_dc[node], abs_tol=1e-16) + + +def test_degree_centrality_empty_graph(): + """Test degree centrality on an empty graph.""" + G = nx.Graph() # Empty graph + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + assert par_dc == expected_dc # Both should return an empty dictionary + + +def test_degree_centrality_single_node(): + """Test degree centrality on a graph with a single node.""" + G = nx.Graph() + G.add_node(1) + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + assert par_dc == expected_dc # Both should return {1: 0.0} + + +def test_degree_centrality_disconnected_graph(): + """Test degree centrality on a disconnected graph.""" + G = nx.Graph() + G.add_nodes_from([1, 2, 3]) # Add three disconnected nodes + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + assert par_dc == expected_dc # Both should return {1: 0.0, 2: 0.0, 3: 0.0} + + +def test_degree_centrality_self_loops(): + """Test degree centrality on a graph with self-loops.""" + G = nx.Graph() + G.add_edges_from([(1, 1), (2, 2), (2, 3)]) # Add self-loops and one normal edge + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + for node in G.nodes: + assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16) + + +def test_degree_centrality_directed_graph(): + """Test degree centrality on a directed graph.""" + G = nx.DiGraph() + G.add_edges_from([(1, 2), (2, 3), (3, 1)]) # Create a directed cycle + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + for node in G.nodes: + assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16) + + +def test_degree_centrality_multigraph(): + """Test degree centrality on a multigraph.""" + G = nx.MultiGraph() + G.add_edges_from([(1, 2), (1, 2), (2, 3)]) # Add multiple edges between nodes + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + for node in G.nodes: + assert math.isclose(par_dc[node], expected_dc[node], abs_tol=1e-16) + + +def test_degree_centrality_large_graph(): + """Test degree centrality on a large graph.""" + G = nx.fast_gnp_random_graph(1000, 0.01, seed=42) + H = nxp.ParallelGraph(G) + + # Compute degree centrality + par_dc = nxp.degree_centrality(H) + expected_dc = nx.degree_centrality(G) + + for node in G.nodes: + assert math.isclose( + par_dc[node], expected_dc[node], abs_tol=1e-6 + ) # Larger tolerance for large graphs diff --git a/nx_parallel/interface.py b/nx_parallel/interface.py index 38af8c73..c71f58ce 100644 --- a/nx_parallel/interface.py +++ b/nx_parallel/interface.py @@ -18,6 +18,7 @@ # Centrality "betweenness_centrality", "edge_betweenness_centrality", + "degree_centrality", # Efficiency "local_efficiency", # Shortest Paths : generic diff --git a/timing/heatmap_degree_centrality_timing.png b/timing/heatmap_degree_centrality_timing.png new file mode 100644 index 00000000..655108f2 Binary files /dev/null and b/timing/heatmap_degree_centrality_timing.png differ diff --git a/timing/timing_all_functions.py b/timing/timing_all_functions.py index fe1cba97..c72deb66 100644 --- a/timing/timing_all_functions.py +++ b/timing/timing_all_functions.py @@ -1,4 +1,5 @@ import time +import os import networkx as nx import pandas as pd @@ -9,8 +10,13 @@ # Code to create README heatmap for all functions in function_list heatmapDF = pd.DataFrame() -function_list = [nx.betweenness_centrality, nx.closeness_vitality, nx.local_efficiency] -number_of_nodes_list = [10, 20, 50, 300, 600] +function_list = [ + nx.betweenness_centrality, + nx.closeness_vitality, + nx.closeness_centrality, + nx.degree_centrality, +] +number_of_nodes_list = [10, 20, 50, 150, 250] for i in range(0, len(function_list)): currFun = function_list[i] @@ -23,46 +29,64 @@ # time both versions and update heatmapDF t1 = time.time() - c = currFun(H) + if currFun == nx_parallel.closeness_centrality: + # Explicitly pass get_chunks="chunks" for the parallel version + c = currFun(H, get_chunks="chunks") + else: + c = currFun(H) t2 = time.time() parallelTime = t2 - t1 + t1 = time.time() - c = currFun(G) + if currFun == nx_parallel.closeness_centrality: + # Explicitly pass get_chunks="chunks" for the parallel version + c = currFun(G, get_chunks="chunks") + else: + c = currFun(G) t2 = time.time() stdTime = t2 - t1 + timesFaster = stdTime / parallelTime heatmapDF.at[j, i] = timesFaster print("Finished " + str(currFun)) -# Code to create for row of heatmap specifically for tournaments +# Code to handle nx.tournament.is_reachable separately for j in range(0, len(number_of_nodes_list)): num = number_of_nodes_list[j] G = nx.tournament.random_tournament(num) - H = nx_parallel.ParallelDiGraph(G) + H = nx_parallel.ParallelGraph(G) t1 = time.time() - c = nx.tournament.is_reachable(H, 1, num) + c = nx.tournament.is_reachable( + H, 0, num - 1 + ) # Provide source (0) and target (num - 1) t2 = time.time() parallelTime = t2 - t1 t1 = time.time() - c = nx.tournament.is_reachable(G, 1, num) + c = nx.tournament.is_reachable( + G, 0, num - 1 + ) # Provide source (0) and target (num - 1) t2 = time.time() stdTime = t2 - t1 timesFaster = stdTime / parallelTime - heatmapDF.at[j, 3] = timesFaster + heatmapDF.at[j, len(function_list)] = ( + timesFaster # Add this as a new row in the heatmap + ) + print("Finished nx.tournament.is_reachable") # plotting the heatmap with numbers and a green color scheme plt.figure(figsize=(20, 4)) hm = sns.heatmap(data=heatmapDF.T, annot=True, cmap="Greens", cbar=True) -# Remove the tick labels on both axes -hm.set_yticklabels( - [ - "betweenness_centrality", - "closeness_vitality", - "local_efficiency", - "tournament is_reachable", - ] -) +# Dynamically set y-axis labels based on the number of rows in heatmapDF +labels = [ + "betweenness_centrality", + "closeness_vitality", + "degree_centrality", + "tournament is_reachable", +] + +# Ensure the number of labels matches the number of rows in heatmapDF +hm.set_yticklabels(labels[: len(heatmapDF.columns)]) # Adding x-axis labels hm.set_xticklabels(number_of_nodes_list) @@ -76,3 +100,6 @@ # displaying the plotted heatmap plt.tight_layout() + +os.makedirs("timing", exist_ok=True) +plt.savefig("timing/" + "heatmap_all_functions_timing.png") diff --git a/timing/timing_comparison.md b/timing/timing_comparison.md index ea331708..985dfd9b 100644 --- a/timing/timing_comparison.md +++ b/timing/timing_comparison.md @@ -22,6 +22,9 @@ betweenness_centrality closeness_vitality ![alt text](heatmap_closeness_vitality_timing.png) +degree_centrality +![alt text](heatmap_degree_centrality_timing.png) + local_efficiency ![alt text](heatmap_local_efficiency_timing.png) diff --git a/timing/timing_individual_function.py b/timing/timing_individual_function.py index 809315d0..3f57055e 100644 --- a/timing/timing_individual_function.py +++ b/timing/timing_individual_function.py @@ -1,110 +1,59 @@ import time - import networkx as nx +import nx_parallel as nxp import pandas as pd import seaborn as sns from matplotlib import pyplot as plt -import nx_parallel as nxp - # Code to create README heatmaps for individual function currFun heatmapDF = pd.DataFrame() -# for bipartite graphs -# n = [50, 100, 200, 400] -# m = [25, 50, 100, 200] -number_of_nodes_list = [200, 400, 800, 1600] -weighted = False -pList = [1, 0.8, 0.6, 0.4, 0.2] -currFun = nx.tournament.is_reachable -""" -for p in pList: - for num in range(len(number_of_nodes_list)): - # create original and parallel graphs - G = nx.fast_gnp_random_graph( - number_of_nodes_list[num], p, seed=42, directed=True - ) - - - # for bipartite.node_redundancy - G = nx.bipartite.random_graph(n[num], m[num], p, seed=42, directed=True) - for i in G.nodes: - l = list(G.neighbors(i)) - if len(l) == 0: - v = random.choice(list(G.nodes) - [i,]) - G.add_edge(i, v) - G.add_edge(i, random.choice([node for node in G.nodes if node != i])) - elif len(l) == 1: - G.add_edge(i, random.choice([node for node in G.nodes if node != i and node not in list(G.neighbors(i))])) +number_of_nodes_list = [10, 50, 100, 200, 400] +pList = [1, 0.8, 0.6, 0.4, 0.2] # List of edge probabilities +currFun = nxp.degree_centrality - # for weighted graphs - if weighted: - random.seed(42) - for u, v in G.edges(): - G[u][v]["weight"] = random.random() +for p in pList: # Loop through edge probabilities + for num in number_of_nodes_list: # Loop through number of nodes + print(f"Processing graph with {num} nodes and edge probability {p}") + # Create original and parallel graphs + G = nx.fast_gnp_random_graph(num, p, seed=42, directed=True) H = nxp.ParallelGraph(G) - # time both versions and update heatmapDF + # Time the parallel version t1 = time.time() c1 = currFun(H) - if isinstance(c1, types.GeneratorType): - d = dict(c1) t2 = time.time() parallelTime = t2 - t1 + + # Time the standard version t1 = time.time() c2 = currFun(G) - if isinstance(c2, types.GeneratorType): - d = dict(c2) t2 = time.time() stdTime = t2 - t1 - timesFaster = stdTime / parallelTime - heatmapDF.at[number_of_nodes_list[num], p] = timesFaster - print("Finished " + str(currFun)) -""" -# Code to create for row of heatmap specifically for tournaments -for num in number_of_nodes_list: - print(num) - G = nx.tournament.random_tournament(num, seed=42) - H = nxp.ParallelGraph(G) - t1 = time.time() - c = currFun(H, 1, num) - t2 = time.time() - parallelTime = t2 - t1 - print(parallelTime) - t1 = time.time() - c = currFun(G, 1, num) - t2 = time.time() - stdTime = t2 - t1 - print(stdTime) - timesFaster = stdTime / parallelTime - heatmapDF.at[num, 3] = timesFaster - print("Finished " + str(currFun)) + # Calculate speedup + timesFaster = stdTime / parallelTime + heatmapDF.at[num, p] = timesFaster + print(f"Finished {currFun.__name__} for {num} nodes and p={p}") -# plotting the heatmap with numbers and a green color scheme +# Plotting the heatmap with numbers and a green color scheme plt.figure(figsize=(20, 4)) hm = sns.heatmap(data=heatmapDF.T, annot=True, cmap="Greens", cbar=True) -# Remove the tick labels on both axes -hm.set_yticklabels( - [ - 3, - ] -) - -# Adding x-axis labels +# Adding x-axis and y-axis labels hm.set_xticklabels(number_of_nodes_list) +hm.set_yticklabels(pList) -# Rotating the x-axis labels for better readability (optional) +# Rotating the x-axis labels for better readability plt.xticks(rotation=45) plt.yticks(rotation=20) plt.title( - "Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to NetworkX" + f"Speedups of {currFun.__name__} compared to NetworkX for Different Edge Probabilities" ) plt.xlabel("Number of Vertices") plt.ylabel("Edge Probability") -print(currFun.__name__) -# displaying the plotted heatmap +# Save and display the heatmap plt.tight_layout() -plt.savefig("timing/" + "heatmap_" + currFun.__name__ + "_timing.png") +plt.savefig(f"heatmap_{currFun.__name__}_timing.png") +plt.show()