Skip to content

Commit ebceb19

Browse files
authored
[ENH] Refactor dual contouring with parallel and sequential triangulation modules (#26)
# Refactor Dual Contouring with Parallel Processing Support This PR refactors the dual contouring implementation to support parallel processing for multiple surfaces. The changes include: - Added parallel processing capability using `torch.multiprocessing` or standard `multiprocessing` - Split triangulation logic into separate modules for better organization: - `_parallel_triangulation.py` for multi-process surface processing - `_sequential_triangulation.py` for single-process processing - Implemented worker initialization to prevent thread oversubscription - Added logic to automatically determine when parallel processing is beneficial - Fixed gradient data handling to ensure compatible dtypes between tensors - Improved error handling and fallback to sequential processing when needed Currently, parallel processing is disabled by default as benchmarks don't show significant speedup yet, but the infrastructure is in place for future optimization.
2 parents bcac51d + 07e00f2 commit ebceb19

File tree

3 files changed

+362
-81
lines changed

3 files changed

+362
-81
lines changed
Lines changed: 93 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,68 @@
1-
import numpy
2-
import warnings
1+
import os
32
from typing import List
43

5-
import numpy as np
6-
7-
from gempy_engine.config import AvailableBackends
84
from ... import optional_dependencies
9-
105
from ...core.backend_tensor import BackendTensor
116
from ...core.data.dual_contouring_data import DualContouringData
127
from ...core.data.dual_contouring_mesh import DualContouringMesh
138
from ...core.utils import gempy_profiler_decorator
14-
from ...modules.dual_contouring.dual_contouring_interface import triangulate_dual_contouring, generate_dual_contouring_vertices
15-
from ...modules.dual_contouring.fancy_triangulation import triangulate
9+
from ...modules.dual_contouring._parallel_triangulation import _should_use_parallel_processing, _process_surface_batch, _init_worker
10+
from ...modules.dual_contouring._sequential_triangulation import _sequential_triangulation
11+
12+
# Multiprocessing imports
13+
try:
14+
import torch.multiprocessing as mp
15+
MULTIPROCESSING_AVAILABLE = True
16+
except ImportError:
17+
import multiprocessing as mp
18+
MULTIPROCESSING_AVAILABLE = False
19+
20+
21+
1622

1723

1824
@gempy_profiler_decorator
1925
def compute_dual_contouring(dc_data_per_stack: DualContouringData, left_right_codes=None, debug: bool = False) -> List[DualContouringMesh]:
2026
valid_edges_per_surface = dc_data_per_stack.valid_edges.reshape((dc_data_per_stack.n_surfaces_to_export, -1, 12))
2127

22-
# ? Is there a way to cut also the vertices?
28+
# Check if we should use parallel processing
29+
use_parallel = _should_use_parallel_processing(dc_data_per_stack.n_surfaces_to_export, BackendTensor.engine_backend)
30+
parallel_results = None
31+
32+
if use_parallel and False: # ! (Miguel Sep 25) I do not see a speedup
33+
print(f"Using parallel processing for {dc_data_per_stack.n_surfaces_to_export} surfaces")
34+
parallel_results = _parallel_process_surfaces(dc_data_per_stack, left_right_codes, debug)
35+
2336

37+
# Fall back to sequential processing
38+
print(f"Using sequential processing for {dc_data_per_stack.n_surfaces_to_export} surfaces")
2439
stack_meshes: List[DualContouringMesh] = []
2540

26-
last_surface_edge_idx = 0
2741
for i in range(dc_data_per_stack.n_surfaces_to_export):
2842
# @off
29-
valid_edges : np.ndarray = valid_edges_per_surface[i]
30-
next_surface_edge_idx: int = valid_edges.sum() + last_surface_edge_idx
31-
slice_object : slice = slice(last_surface_edge_idx, next_surface_edge_idx)
32-
last_surface_edge_idx: int = next_surface_edge_idx
33-
34-
dc_data_per_surface = DualContouringData(
35-
xyz_on_edge = dc_data_per_stack.xyz_on_edge,
36-
valid_edges = valid_edges,
37-
xyz_on_centers = dc_data_per_stack.xyz_on_centers,
38-
dxdydz = dc_data_per_stack.dxdydz,
39-
exported_fields_on_edges = dc_data_per_stack.exported_fields_on_edges,
40-
n_surfaces_to_export = dc_data_per_stack.n_surfaces_to_export,
41-
tree_depth = dc_data_per_stack.tree_depth
42-
43-
)
44-
vertices: np.ndarray = generate_dual_contouring_vertices(
45-
dc_data_per_stack = dc_data_per_surface,
46-
slice_surface = slice_object,
47-
debug = debug
48-
)
49-
50-
if left_right_codes is None:
51-
# * Legacy triangulation
52-
indices = triangulate_dual_contouring(dc_data_per_surface)
43+
if parallel_results is not None:
44+
_, vertices_numpy = _sequential_triangulation(
45+
dc_data_per_stack,
46+
debug,
47+
i,
48+
left_right_codes,
49+
valid_edges_per_surface,
50+
compute_indices=False
51+
)
52+
indices_numpy = parallel_results[i]
5353
else:
54-
# * Fancy triangulation 👗
55-
56-
# * Average gradient for the edges
57-
edges_normals = BackendTensor.t.zeros((valid_edges.shape[0], 12, 3), dtype=BackendTensor.dtype_obj)
58-
edges_normals[:] = np.nan
59-
edges_normals[valid_edges] = dc_data_per_stack.gradients[slice_object]
60-
61-
# if LEGACY:=True:
62-
if BackendTensor.engine_backend != AvailableBackends.PYTORCH:
63-
with warnings.catch_warnings():
64-
warnings.simplefilter("ignore", category=RuntimeWarning)
65-
voxel_normal = np.nanmean(edges_normals, axis=1)
66-
voxel_normal = voxel_normal[(~np.isnan(voxel_normal).any(axis=1))] # drop nans
67-
pass
68-
else:
69-
# Assuming edges_normals is a PyTorch tensor
70-
nan_mask = BackendTensor.t.isnan(edges_normals)
71-
valid_count = (~nan_mask).sum(dim=1)
72-
73-
# Replace NaNs with 0 for sum calculation
74-
safe_normals = edges_normals.clone()
75-
safe_normals[nan_mask] = 0
76-
77-
# Compute the sum of non-NaN elements
78-
sum_normals = BackendTensor.t.sum(safe_normals, 1)
79-
80-
# Calculate the mean, avoiding division by zero
81-
voxel_normal = sum_normals / valid_count.clamp(min=1)
82-
83-
# Remove rows where all elements were NaN (and hence valid_count is 0)
84-
voxel_normal = voxel_normal[valid_count > 0].reshape(-1, 3)
85-
86-
87-
valid_voxels = dc_data_per_surface.valid_voxels
88-
indices = triangulate(
89-
left_right_array = left_right_codes[valid_voxels],
90-
valid_edges = dc_data_per_surface.valid_edges[valid_voxels],
91-
tree_depth = dc_data_per_surface.tree_depth,
92-
voxel_normals = voxel_normal
54+
indices_numpy, vertices_numpy = _sequential_triangulation(
55+
dc_data_per_stack,
56+
debug,
57+
i,
58+
left_right_codes,
59+
valid_edges_per_surface,
60+
compute_indices=True
9361
)
94-
indices = BackendTensor.t.concatenate(indices, axis=0)
95-
96-
# @on
97-
vertices_numpy = BackendTensor.t.to_numpy(vertices)
98-
indices_numpy = BackendTensor.t.to_numpy(indices)
99-
62+
10063
if TRIMESH_LAST_PASS := True:
10164
vertices_numpy, indices_numpy = _last_pass(vertices_numpy, indices_numpy)
102-
65+
10366
stack_meshes.append(
10467
DualContouringMesh(
10568
vertices_numpy,
@@ -110,6 +73,55 @@ def compute_dual_contouring(dc_data_per_stack: DualContouringData, left_right_co
11073
return stack_meshes
11174

11275

76+
77+
78+
def _parallel_process_surfaces(dc_data_per_stack, left_right_codes, debug, num_workers=None, chunk_size=2):
79+
"""Process surfaces in parallel using multiprocessing."""
80+
if num_workers is None:
81+
num_workers = max(1, min(os.cpu_count() // 2, dc_data_per_stack.n_surfaces_to_export // 2))
82+
83+
# Prepare data for serialization
84+
dc_data_dict = {
85+
'xyz_on_edge' : dc_data_per_stack.xyz_on_edge,
86+
'valid_edges' : dc_data_per_stack.valid_edges,
87+
'xyz_on_centers' : dc_data_per_stack.xyz_on_centers,
88+
'dxdydz' : dc_data_per_stack.dxdydz,
89+
'exported_fields_on_edges': dc_data_per_stack.exported_fields_on_edges,
90+
'n_surfaces_to_export' : dc_data_per_stack.n_surfaces_to_export,
91+
'tree_depth' : dc_data_per_stack.tree_depth,
92+
# 'gradients': getattr(dc_data_per_stack, 'gradients', None)
93+
}
94+
95+
# Create surface index chunks
96+
surface_indices = list(range(dc_data_per_stack.n_surfaces_to_export))
97+
chunks = [surface_indices[i:i + chunk_size] for i in range(0, len(surface_indices), chunk_size)]
98+
99+
try:
100+
# Use spawn context for better PyTorch compatibility
101+
ctx = mp.get_context("spawn") if MULTIPROCESSING_AVAILABLE else mp
102+
103+
with ctx.Pool(processes=num_workers, initializer=_init_worker) as pool:
104+
# Submit all chunks
105+
async_results = []
106+
for chunk in chunks:
107+
result = pool.apply_async(
108+
_process_surface_batch,
109+
(chunk, dc_data_dict, left_right_codes, debug)
110+
)
111+
async_results.append(result)
112+
113+
# Collect results
114+
all_results = []
115+
for async_result in async_results:
116+
batch_results = async_result.get()
117+
all_results.extend(batch_results)
118+
119+
return all_results
120+
121+
except Exception as e:
122+
print(f"Parallel processing failed: {e}. Falling back to sequential processing.")
123+
return None
124+
113125
def _last_pass(vertices, indices):
114126
# Check if trimesh is available
115127
try:
@@ -118,4 +130,4 @@ def _last_pass(vertices, indices):
118130
mesh.fill_holes()
119131
return mesh.vertices, mesh.faces
120132
except ImportError:
121-
return vertices, indices
133+
return vertices, indices
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import numpy as np
2+
import os
3+
import warnings
4+
5+
from gempy_engine.config import AvailableBackends
6+
from ...core.backend_tensor import BackendTensor
7+
from ...core.data.dual_contouring_data import DualContouringData
8+
from ...modules.dual_contouring.dual_contouring_interface import triangulate_dual_contouring
9+
from ...modules.dual_contouring.fancy_triangulation import triangulate
10+
11+
# Multiprocessing imports
12+
try:
13+
import torch.multiprocessing as mp
14+
MULTIPROCESSING_AVAILABLE = True
15+
except ImportError:
16+
import multiprocessing as mp
17+
MULTIPROCESSING_AVAILABLE = False
18+
19+
20+
def _should_use_parallel_processing(n_surfaces: int, backend: AvailableBackends) -> bool:
21+
"""Determine if parallel processing should be used."""
22+
# Only use parallel processing for PyTorch CPU backend with sufficient surfaces
23+
if backend == AvailableBackends.PYTORCH and MULTIPROCESSING_AVAILABLE:
24+
# Check if we're on CPU (not GPU)
25+
try:
26+
import torch
27+
if torch.cuda.is_available():
28+
# If CUDA is available, check if default tensor type is CPU
29+
dummy = BackendTensor.t.zeros(1)
30+
is_cpu = dummy.device.type == 'cpu' if hasattr(dummy, 'device') else True
31+
else:
32+
is_cpu = True
33+
34+
# Use parallel processing if we have CPU tensors and enough surfaces to justify overhead
35+
return is_cpu and n_surfaces >= 4
36+
except ImportError:
37+
return False
38+
return False
39+
40+
41+
def _init_worker():
42+
"""Initialize worker process to avoid thread oversubscription."""
43+
# Set environment variables for NumPy/OpenMP/MKL
44+
os.environ['OMP_NUM_THREADS'] = '1'
45+
os.environ['MKL_NUM_THREADS'] = '1'
46+
os.environ['OPENBLAS_NUM_THREADS'] = '1'
47+
os.environ['NUMEXPR_NUM_THREADS'] = '1'
48+
49+
# For PyTorch, set environment variables before import
50+
os.environ['TORCH_NUM_THREADS'] = '1'
51+
os.environ['TORCH_NUM_INTEROP_THREADS'] = '1'
52+
53+
# Now import torch in the worker process
54+
try:
55+
import torch
56+
# These calls might still work if torch hasn't done any parallel work yet in this process
57+
try:
58+
torch.set_num_threads(1)
59+
torch.set_num_interop_threads(1)
60+
except RuntimeError:
61+
# If the above fails, the environment variables should handle it
62+
pass
63+
except ImportError:
64+
pass
65+
66+
67+
def _process_surface_batch(surface_indices_batch, dc_data_dict, left_right_codes, debug):
68+
"""Process a batch of surfaces in a worker process."""
69+
_init_worker()
70+
71+
# Reconstruct dc_data_per_stack from dictionary
72+
dc_data_per_stack = DualContouringData(**dc_data_dict)
73+
valid_edges_per_surface = dc_data_per_stack.valid_edges.reshape((dc_data_per_stack.n_surfaces_to_export, -1, 12))
74+
75+
batch_results = []
76+
77+
for i in surface_indices_batch:
78+
result = _process_single_surface(
79+
i, dc_data_per_stack, valid_edges_per_surface, left_right_codes, debug
80+
)
81+
batch_results.append(result)
82+
83+
return batch_results
84+
85+
def _process_single_surface(i, dc_data_per_stack, valid_edges_per_surface, left_right_codes, debug):
86+
"""Process a single surface and return vertices and indices."""
87+
try:
88+
valid_edges = valid_edges_per_surface[i]
89+
90+
# Calculate edge indices for this surface
91+
last_surface_edge_idx = sum(valid_edges_per_surface[j].sum() for j in range(i))
92+
next_surface_edge_idx = valid_edges.sum() + last_surface_edge_idx
93+
slice_object = slice(last_surface_edge_idx, next_surface_edge_idx)
94+
95+
dc_data_per_surface = DualContouringData(
96+
xyz_on_edge=dc_data_per_stack.xyz_on_edge,
97+
valid_edges=valid_edges,
98+
xyz_on_centers=dc_data_per_stack.xyz_on_centers,
99+
dxdydz=dc_data_per_stack.dxdydz,
100+
exported_fields_on_edges=dc_data_per_stack.exported_fields_on_edges,
101+
n_surfaces_to_export=dc_data_per_stack.n_surfaces_to_export,
102+
tree_depth=dc_data_per_stack.tree_depth
103+
)
104+
105+
if left_right_codes is None:
106+
# Legacy triangulation
107+
indices = triangulate_dual_contouring(dc_data_per_surface)
108+
else:
109+
edges_normals = BackendTensor.t.zeros((valid_edges.shape[0], 12, 3), dtype=BackendTensor.dtype_obj)
110+
if BackendTensor.engine_backend == AvailableBackends.PYTORCH:
111+
edges_normals[:] = float('nan') # Use Python float nan instead of np.nan
112+
else:
113+
edges_normals[:] = np.nan
114+
115+
# Get gradient data
116+
gradient_data = dc_data_per_stack.gradients[slice_object]
117+
118+
# Fix dtype mismatch by ensuring compatible dtypes
119+
if BackendTensor.engine_backend == AvailableBackends.PYTORCH:
120+
if hasattr(gradient_data, 'dtype') and hasattr(edges_normals, 'dtype'):
121+
if gradient_data.dtype != edges_normals.dtype:
122+
gradient_data = gradient_data.to(edges_normals.dtype)
123+
124+
edges_normals[valid_edges] = gradient_data
125+
126+
if BackendTensor.engine_backend != AvailableBackends.PYTORCH:
127+
with warnings.catch_warnings():
128+
warnings.simplefilter("ignore", category=RuntimeWarning)
129+
voxel_normal = np.nanmean(edges_normals, axis=1)
130+
voxel_normal = voxel_normal[(~np.isnan(voxel_normal).any(axis=1))]
131+
else:
132+
# PyTorch tensor operations
133+
nan_mask = BackendTensor.t.isnan(edges_normals)
134+
valid_count = (~nan_mask).sum(dim=1)
135+
safe_normals = edges_normals.clone()
136+
safe_normals[nan_mask] = 0
137+
sum_normals = BackendTensor.t.sum(safe_normals, 1)
138+
voxel_normal = sum_normals / valid_count.clamp(min=1)
139+
voxel_normal = voxel_normal[valid_count > 0].reshape(-1, 3)
140+
141+
valid_voxels = dc_data_per_surface.valid_voxels
142+
left_right_per_surface = left_right_codes[valid_voxels]
143+
valid_voxels_per_surface = dc_data_per_surface.valid_edges[valid_voxels]
144+
tree_depth_per_surface = dc_data_per_surface.tree_depth
145+
146+
indices = triangulate(
147+
left_right_array=left_right_per_surface,
148+
valid_edges=valid_voxels_per_surface,
149+
tree_depth=tree_depth_per_surface,
150+
voxel_normals=voxel_normal
151+
)
152+
indices = BackendTensor.t.concatenate(indices, axis=0)
153+
154+
# vertices_numpy = BackendTensor.t.to_numpy(vertices)
155+
indices_numpy = BackendTensor.t.to_numpy(indices)
156+
return indices_numpy
157+
158+
except Exception as e:
159+
print(f"ERROR in _process_single_surface for surface {i}: {e}")
160+
import traceback
161+
traceback.print_exc()
162+
raise
163+

0 commit comments

Comments
 (0)