|
| 1 | +import os |
| 2 | +from typing import List |
| 3 | + |
| 4 | +from ._gen_vertices import _generate_vertices |
| 5 | +from ._parallel_triangulation import _should_use_parallel_processing, _init_worker |
| 6 | +from ._sequential_triangulation import _compute_triangulation |
| 7 | +from ... import optional_dependencies |
| 8 | +from ...core.backend_tensor import BackendTensor |
| 9 | +from ...core.data.dual_contouring_data import DualContouringData |
| 10 | +from ...core.data.dual_contouring_mesh import DualContouringMesh |
| 11 | +from ...core.utils import gempy_profiler_decorator |
| 12 | + |
| 13 | +# Multiprocessing imports |
| 14 | +try: |
| 15 | + import torch.multiprocessing as mp |
| 16 | + |
| 17 | + MULTIPROCESSING_AVAILABLE = True |
| 18 | +except ImportError: |
| 19 | + import multiprocessing as mp |
| 20 | + |
| 21 | + MULTIPROCESSING_AVAILABLE = False |
| 22 | + |
| 23 | + |
| 24 | +@gempy_profiler_decorator |
| 25 | +def compute_dual_contouring_v2(dc_data_list: list[DualContouringData], ) -> List[DualContouringMesh]: |
| 26 | + parallel_results = _parallel_process(dc_data_list) |
| 27 | + |
| 28 | + if parallel_results is not None: |
| 29 | + return parallel_results |
| 30 | + |
| 31 | + |
| 32 | + # Fall back to sequential processing |
| 33 | + print(f"Using sequential processing for {len(dc_data_list)} surfaces") |
| 34 | + stack_meshes: List[DualContouringMesh] = [] |
| 35 | + |
| 36 | + for dc_data in dc_data_list: |
| 37 | + mesh = _process_one_surface(dc_data, dc_data.left_right_codes) |
| 38 | + stack_meshes.append(mesh) |
| 39 | + return stack_meshes |
| 40 | + |
| 41 | + |
| 42 | +def _parallel_process(dc_data_list: list[DualContouringData]): |
| 43 | + # Check if we should use parallel processing |
| 44 | + n_surfaces_to_export = len(dc_data_list) |
| 45 | + use_parallel = _should_use_parallel_processing(n_surfaces_to_export, BackendTensor.engine_backend) |
| 46 | + parallel_results = None |
| 47 | + |
| 48 | + if use_parallel and False: # ! (Miguel Sep 25) I do not see a speedup |
| 49 | + print(f"Using parallel processing for {n_surfaces_to_export} surfaces") |
| 50 | + parallel_results = _parallel_process_surfaces_v2(dc_data_list) |
| 51 | + |
| 52 | + return parallel_results |
| 53 | + |
| 54 | + |
| 55 | +def _parallel_process_surfaces_v2(dc_data_list: list[DualContouringData], num_workers=None, chunk_size=2): |
| 56 | + """Process surfaces in parallel using multiprocessing.""" |
| 57 | + n_surfaces = len(dc_data_list) |
| 58 | + |
| 59 | + if num_workers is None: |
| 60 | + num_workers = max(1, min(os.cpu_count() // 2, n_surfaces // 2)) |
| 61 | + num_workers=3 |
| 62 | + |
| 63 | + # Prepare data for serialization - convert each DualContouringData to dict |
| 64 | + dc_data_dicts = [] |
| 65 | + for dc_data in dc_data_list: |
| 66 | + dc_data_dict = { |
| 67 | + 'xyz_on_edge' : dc_data.xyz_on_edge, |
| 68 | + 'valid_edges' : dc_data.valid_edges, |
| 69 | + 'xyz_on_centers' : dc_data.xyz_on_centers, |
| 70 | + 'dxdydz' : dc_data.dxdydz, |
| 71 | + 'gradients' : dc_data.gradients, |
| 72 | + 'left_right_codes' : dc_data.left_right_codes, |
| 73 | + 'n_surfaces_to_export': dc_data.n_surfaces_to_export, |
| 74 | + 'tree_depth' : dc_data.tree_depth |
| 75 | + } |
| 76 | + dc_data_dicts.append(dc_data_dict) |
| 77 | + |
| 78 | + # Create surface index chunks |
| 79 | + surface_indices = list(range(n_surfaces)) |
| 80 | + chunks = [surface_indices[i:i + chunk_size] for i in range(0, len(surface_indices), chunk_size)] |
| 81 | + |
| 82 | + try: |
| 83 | + # Use spawn context for better PyTorch compatibility |
| 84 | + ctx = mp.get_context("fork") if MULTIPROCESSING_AVAILABLE else mp |
| 85 | + |
| 86 | + with ctx.Pool(processes=num_workers, initializer=_init_worker) as pool: |
| 87 | + # Submit all chunks |
| 88 | + async_results = [] |
| 89 | + for chunk in chunks: |
| 90 | + result = pool.apply_async( |
| 91 | + _process_surface_batch_v2, |
| 92 | + (chunk, dc_data_dicts ) |
| 93 | + ) |
| 94 | + async_results.append(result) |
| 95 | + |
| 96 | + # Collect results |
| 97 | + all_results = [] |
| 98 | + for async_result in async_results: |
| 99 | + batch_results = async_result.get() |
| 100 | + all_results.extend(batch_results) |
| 101 | + |
| 102 | + return all_results |
| 103 | + |
| 104 | + except Exception as e: |
| 105 | + print(f"Parallel processing failed: {e}. Falling back to sequential processing.") |
| 106 | + return None |
| 107 | + |
| 108 | + |
| 109 | +def _process_surface_batch_v2(surface_indices, dc_data_dicts, left_right_codes): |
| 110 | + """Process a batch of surfaces. This function runs in worker processes.""" |
| 111 | + results = [] |
| 112 | + |
| 113 | + for idx in surface_indices: |
| 114 | + dc_data_dict = dc_data_dicts[idx] |
| 115 | + |
| 116 | + # Reconstruct DualContouringData from dict |
| 117 | + dc_data = DualContouringData( |
| 118 | + xyz_on_edge=dc_data_dict['xyz_on_edge'], |
| 119 | + valid_edges=dc_data_dict['valid_edges'], |
| 120 | + xyz_on_centers=dc_data_dict['xyz_on_centers'], |
| 121 | + dxdydz=dc_data_dict['dxdydz'], |
| 122 | + gradients=dc_data_dict['gradients'], |
| 123 | + left_right_codes=dc_data_dict['left_right_codes'], |
| 124 | + n_surfaces_to_export=dc_data_dict['n_surfaces_to_export'], |
| 125 | + tree_depth=dc_data_dict['tree_depth'] |
| 126 | + ) |
| 127 | + # Process the surface |
| 128 | + mesh = _process_one_surface(dc_data, dc_data.left_right_codes) |
| 129 | + results.append(mesh) |
| 130 | + |
| 131 | + return results |
| 132 | +def _process_one_surface(dc_data: DualContouringData, left_right_codes) -> DualContouringMesh: |
| 133 | + vertices = _generate_vertices(dc_data, False, None) |
| 134 | + |
| 135 | + # * Average gradient for the edges |
| 136 | + valid_edges = dc_data.valid_edges |
| 137 | + edges_normals = BackendTensor.t.zeros((valid_edges.shape[0], 12, 3), dtype=BackendTensor.dtype_obj) |
| 138 | + edges_normals[:] = 0 |
| 139 | + edges_normals[valid_edges] = dc_data.gradients |
| 140 | + |
| 141 | + indices_numpy = _compute_triangulation( |
| 142 | + dc_data_per_surface=dc_data, |
| 143 | + left_right_codes=left_right_codes, |
| 144 | + edges_normals=edges_normals, |
| 145 | + vertex=vertices |
| 146 | + ) |
| 147 | + |
| 148 | + vertices_numpy = BackendTensor.t.to_numpy(vertices) |
| 149 | + if TRIMESH_LAST_PASS := True: |
| 150 | + vertices_numpy, indices_numpy = _last_pass(vertices_numpy, indices_numpy) |
| 151 | + |
| 152 | + mesh = DualContouringMesh(vertices_numpy, indices_numpy, dc_data) |
| 153 | + return mesh |
| 154 | + |
| 155 | + |
| 156 | +def _last_pass(vertices, indices): |
| 157 | + # Check if trimesh is available |
| 158 | + try: |
| 159 | + trimesh = optional_dependencies.require_trimesh() |
| 160 | + mesh = trimesh.Trimesh(vertices=vertices, faces=indices) |
| 161 | + mesh.fill_holes() |
| 162 | + return mesh.vertices, mesh.faces |
| 163 | + except ImportError: |
| 164 | + return vertices, indices |
0 commit comments