diff --git a/doc/source/changelog/1028.added.md b/doc/source/changelog/1028.added.md new file mode 100644 index 000000000..8d63d46e9 --- /dev/null +++ b/doc/source/changelog/1028.added.md @@ -0,0 +1 @@ +Mesh blood pools diff --git a/src/ansys/health/heart/models.py b/src/ansys/health/heart/models.py index 23696dcc1..7afb923bf 100644 --- a/src/ansys/health/heart/models.py +++ b/src/ansys/health/heart/models.py @@ -249,7 +249,7 @@ def __init__(self, working_directory: pathlib.Path | str = None) -> None: self.mesh = Mesh() """Computational mesh.""" - self.fluid_mesh = Mesh() + self._fluid_mesh = Mesh() """Generated fluid mesh.""" #! TODO: non-functional flag. Remove or replace. @@ -527,77 +527,46 @@ def mesh_volume( return self.mesh - def _mesh_fluid_volume(self, remesh_caps: bool = True): - """Generate a volume mesh of the cavities. - - Parameters - ---------- - remesh_caps : bool, default: True - Whether to remesh the caps of each cavity. - """ - # get all relevant boundaries for the fluid cavities: + def _mesh_fluid_volume(self) -> Mesh: + """Generate a volume mesh of the cavities.""" + # get all relevant boundaries for the fluid: + # NOTE: relies on substrings to select the right surfaces/boundaries. substrings_include = ["endocardium", "valve-plane", "septum"] - substrings_include_re = "|".join(substrings_include) + substrings_include_regex = "|".join(substrings_include) - substrings_exlude = ["pulmonary-valve", "aortic-valve"] - substrings_exlude_re = "|".join(substrings_exlude) + substrings_exlude = [CapType.PULMONARY_VALVE.value, CapType.AORTIC_VALVE.value] + substrings_exlude_regex = "|".join(substrings_exlude) boundaries_fluid = [ - b for b in self.mesh._surfaces if re.search(substrings_include_re, b.name) + b for b in self.mesh._surfaces if re.search(substrings_include_regex, b.name) ] boundaries_exclude = [ - b.name for b in boundaries_fluid if re.search(substrings_exlude_re, b.name) + b.name for b in boundaries_fluid if re.search(substrings_exlude_regex, b.name) ] boundaries_fluid = [b for b in boundaries_fluid if b.name not in boundaries_exclude] - caps = [c._mesh for p in self.parts for c in p.caps] - if len(boundaries_fluid) == 0: - LOGGER.debug( - "Meshing of fluid cavities is not possible. No fluid surfaces are detected." - ) - return - - if len(caps) == 0: - LOGGER.debug("Meshing of fluid cavities is not possible. No caps are detected.") + LOGGER.error("Meshing of blood pool is not possible. No fluid surfaces detected.") return - LOGGER.info("Meshing fluid cavities...") + LOGGER.info("Meshing blood pool...") - # mesh the fluid cavities - fluid_mesh = mesher._mesh_fluid_cavities( - boundaries_fluid, caps, self.workdir, remesh_caps=remesh_caps - ) - - LOGGER.info(f"Meshed {len(fluid_mesh.cell_zones)} fluid regions...") - - # add part-ids - cz_ids = np.sort([cz.id for cz in fluid_mesh.cell_zones]) - - # TODO: this offset is arbitrary. - offset = 10000 - new_ids = np.arange(cz_ids.shape[0]) + offset - czid_to_pid = {cz_id: new_ids[ii] for ii, cz_id in enumerate(cz_ids)} + fluid_mesh = mesher._mesh_fluid_from_boundaries(boundaries_fluid, self.workdir, mesh_size=1) - for cz in fluid_mesh.cell_zones: - cz.id = czid_to_pid[cz.id] + # update patches with appropriate cap name, based on centroid location. + model_caps = [c for part in self.parts for c in part.caps] + cap_centroids = np.array([cap.centroid for cap in model_caps]) + cap_names = [cap.name for cap in model_caps] - fluid_mesh._fix_negative_cells() - fluid_mesh_vtk = fluid_mesh._to_vtk(add_cells=True, add_faces=False) + patches = {sid: sn for sid, sn in fluid_mesh._surface_id_to_name.items() if "patch" in sn} + for patch_id in patches.keys(): + patch_mesh = fluid_mesh.get_surface(patch_id) + cap_index = np.argmin(np.linalg.norm(cap_centroids - patch_mesh.center, axis=1)) + fluid_mesh._surface_id_to_name[patch_id] = cap_names[cap_index] - fluid_mesh_vtk.cell_data["_volume-id"] = fluid_mesh_vtk.cell_data["cell-zone-ids"] + self._fluid_mesh = fluid_mesh - boundaries = [ - SurfaceMesh(name=fz.name, triangles=fz.faces, nodes=fluid_mesh.nodes, id=fz.id) - for fz in fluid_mesh.face_zones - if "interior" not in fz.name - ] - - self.fluid_mesh = Mesh(fluid_mesh_vtk) - for boundary in boundaries: - self.fluid_mesh.add_surface(boundary, boundary.id, boundary.name) - - return + return fluid_mesh def get_part(self, name: str, by_substring: bool = False) -> anatomy.Part | None: """Get a specific part based on a part name.""" diff --git a/src/ansys/health/heart/pre/mesher.py b/src/ansys/health/heart/pre/mesher.py index 1f7b9861f..b2e57e230 100644 --- a/src/ansys/health/heart/pre/mesher.py +++ b/src/ansys/health/heart/pre/mesher.py @@ -502,110 +502,6 @@ def _set_size_field_on_face_zones( return session -# TODO: fix method. -def _mesh_fluid_cavities( - fluid_boundaries: list[SurfaceMesh], - caps: list[SurfaceMesh], - workdir: str, - remesh_caps: bool = True, -) -> _FluentMesh: - """Mesh the fluid cavities. - - Parameters - ---------- - fluid_boundaries : List[SurfaceMesh] - List of fluid boundaries used for meshing. - caps : List[SurfaceMesh] - List of caps that close each of the cavities. - workdir : str - Working directory. - remesh_caps : bool, default: True - whether to remesh the caps. - - Returns - ------- - Path - Path to the ``.msh.h5`` volume mesh. - """ - if _uses_container: - mounted_volume = pyfluent.EXAMPLES_PATH - work_dir_meshing = os.path.join(mounted_volume, "tmp_meshing-fluid") - else: - work_dir_meshing = os.path.join(workdir, "meshing-fluid") - - if not os.path.isdir(work_dir_meshing): - os.makedirs(work_dir_meshing) - else: - files = glob.glob(os.path.join(work_dir_meshing, "*.stl")) - for f in files: - os.remove(f) - - # write all boundaries - for b in fluid_boundaries: - filename = os.path.join(work_dir_meshing, b.name.lower() + ".stl") - b.save(filename) - add_solid_name_to_stl(filename, b.name.lower(), file_type="binary") - - for c in caps: - filename = os.path.join(work_dir_meshing, c.name.lower() + ".stl") - c.save(filename) - add_solid_name_to_stl(filename, c.name.lower(), file_type="binary") - - session = _get_fluent_meshing_session(work_dir_meshing) - - if _launch_mode == LaunchMode.PIM: - # Upload files to session if in PIM or Container modes. - LOGGER.info(f"Uploading files to session with working directory {work_dir_meshing}...") - files = glob.glob(os.path.join(work_dir_meshing, "*.stl")) - for file in files: - session.upload(file) - # In PIM mode files are uploaded to the Fluents working directory. - work_dir_meshing = "." - - elif _launch_mode == LaunchMode.CONTAINER: - # NOTE: when using a Fluent container visible files - # will be in /mnt/pyfluent. (equal to mount target) - work_dir_meshing = "/mnt/pyfluent/meshing" - - session.tui.file.import_.cad(f"no {work_dir_meshing} *.stl") - - # merge objects - session.tui.objects.merge("'(*)", "model-fluid") - - # fix duplicate nodes - session.tui.diagnostics.face_connectivity.fix_free_faces("objects '(*)") - - # set size field - session.tui.size_functions.set_global_controls(1, 1, 1.2) - session.tui.scoped_sizing.compute("yes") - - # remesh all caps - if remesh_caps: - session.tui.boundary.remesh.remesh_constant_size("(cap_*)", "()", 40, 20, 1, "yes") - - # convert to mesh object - session.tui.objects.change_object_type("(*)", "mesh", "yes") - - # compute volumetric regions - session.tui.objects.volumetric_regions.compute("model-fluid") - - # mesh volume - session.tui.mesh.auto_mesh("model-fluid") - - # clean up - session.tui.objects.delete_all_geom() - session.tui.objects.delete_unreferenced_faces_and_edges() - - # write - file_path_mesh = os.path.join(workdir, "fluid-mesh.msh.h5") - session.tui.file.write_mesh(file_path_mesh) - - mesh = _FluentMesh(file_path_mesh) - mesh.load_mesh() - - return mesh - - def mesh_from_manifold_input_model( model: _InputModel, workdir: str | Path, @@ -1126,3 +1022,128 @@ def mesh_from_non_manifold_input_model( vtk_mesh = _post_meshing_cleanup(new_mesh) return vtk_mesh + + +def _mesh_fluid_from_boundaries( + fluid_boundaries: list[SurfaceMesh], + workdir: str, + mesh_size: float = 1.0, +) -> Mesh: + """Mesh the fluid from the boundary surfaces. + + Parameters + ---------- + fluid_boundaries : List[SurfaceMesh] + List of fluid boundaries used for meshing. + workdir : str + Working directory + mesh_size : float + Mesh size of the patches that. + + Returns + ------- + pv.UnstructuredGrid + Unstructured grid with fluid mesh. + """ + if _launch_mode in [LaunchMode.CONTAINER, LaunchMode.PIM]: + raise NotImplementedError( + "Meshing of fluid boundaries is not yet supported in PIM mode. " + "Please use the containerized or standalone mode." + ) + + if _uses_container: + mounted_volume = pyfluent.EXAMPLES_PATH + work_dir_meshing = os.path.join(mounted_volume, "tmp_meshing-fluid") + else: + work_dir_meshing = os.path.join(workdir, "meshing-fluid") + + if not os.path.isdir(work_dir_meshing): + os.makedirs(work_dir_meshing) + else: + files = glob.glob(os.path.join(work_dir_meshing, "*.stl")) + for f in files: + os.remove(f) + + # write all boundaries + for b in fluid_boundaries: + filename = os.path.join(work_dir_meshing, b.name.lower() + ".stl") + b.save(filename) + add_solid_name_to_stl(filename, b.name.lower(), file_type="binary") + + session = _get_fluent_meshing_session(work_dir_meshing) + + LOGGER.info(f"Starting Fluent Meshing in mode: {_launch_mode}") + + if _launch_mode == LaunchMode.PIM: + # Upload files to session if in PIM or Container modes. + LOGGER.info(f"Uploading files to session with working directory {work_dir_meshing}...") + files = glob.glob(os.path.join(work_dir_meshing, "*.stl")) + for file in files: + session.upload(file) + # In PIM mode files are uploaded to the Fluents working directory. + work_dir_meshing = "." + + elif _launch_mode == LaunchMode.CONTAINER: + # NOTE: when using a Fluent container visible files + # will be in /mnt/pyfluent. (equal to mount target) + work_dir_meshing = "/mnt/pyfluent/meshing" + + session.tui.file.import_.cad(f"no {work_dir_meshing} *.stl") + + # set size field + session.tui.size_functions.set_global_controls(mesh_size, mesh_size, 1.2) + session.tui.scoped_sizing.compute("yes") + + # create caps with uniform size. + session.tui.objects.merge("(*)", "fluid-mesh") + # object_names = list(session.scheme_eval.scheme_eval("(tgapi-util-get-all-object-name-list)")) + # session.tui.objects.rename_object(object_names[0], "fluid-mesh") + session.tui.diagnostics.face_connectivity.fix_free_faces( + "objects '(fluid-mesh) merge-nodes yes 1e-3" + ) + session.tui.objects.change_object_type("'(fluid-mesh)", "mesh", "yes") + + session.scheme_eval.scheme_eval("(tgapi-util-fill-holes-in-face-zone-list '(*) 1000)") + + patch_ids = session.scheme_eval.scheme_eval("(get-unreferenced-face-zones)") + + session.tui.objects.create( + "mesh-patches", + "fluid", + 3, + "({0})".format(" ".join([str(patch_id) for patch_id in patch_ids])), + "()", + "mesh", + "yes", + ) + session.tui.objects.merge("'(*)") + + # compute volume and mesh + session.tui.objects.volumetric_regions.compute("fluid-mesh", "no") + session.tui.mesh.auto_mesh("fluid-mesh", "yes", "pyr", "tet", "yes") + + session.tui.objects.delete_all_geom() + + file_path_mesh = os.path.join(workdir, "fluid-mesh.msh.h5") + if os.path.isfile(file_path_mesh): + os.remove(file_path_mesh) + session.tui.file.write_mesh(file_path_mesh, "ok") + + session.exit() + + # write to file. + mesh = _FluentMesh(file_path_mesh) + mesh.load_mesh(reconstruct_tetrahedrons=True) + + vtk_mesh = Mesh(mesh._to_vtk(add_cells=True, add_faces=True, remove_interior_faces=True)) + vtk_mesh.rename_array("face-zone-ids", "_surface-id") + vtk_mesh.rename_array("cell-zone-ids", "_volume-id") + + vtk_mesh._surface_id_to_name = { + int(fz.id): fz.name for fz in mesh.face_zones if fz.id in vtk_mesh.surface_ids + } + vtk_mesh._volume_id_to_name = { + int(cz.id): cz.name for cz in mesh.cell_zones if cz.id in vtk_mesh.volume_ids + } + + return vtk_mesh diff --git a/src/ansys/health/heart/utils/fluent_reader.py b/src/ansys/health/heart/utils/fluent_reader.py index 59d49e268..1f74980bb 100644 --- a/src/ansys/health/heart/utils/fluent_reader.py +++ b/src/ansys/health/heart/utils/fluent_reader.py @@ -365,7 +365,9 @@ def _convert_interior_faces_to_tetrahedrons(self) -> tuple[np.ndarray, np.ndarra return tetrahedrons, cell_ids # NOTE: no typehint due to lazy import of PpyVista - def _to_vtk(self, add_cells: bool = True, add_faces: bool = False) -> pv.UnstructuredGrid: + def _to_vtk( + self, add_cells: bool = True, add_faces: bool = False, remove_interior_faces: bool = False + ): """Convert the mesh to VTK unstructured grid or polydata. Parameters @@ -374,6 +376,8 @@ def _to_vtk(self, add_cells: bool = True, add_faces: bool = False) -> pv.Unstruc Whether to add cells to the VTK object. add_faces : bool, default: False Whether to add faces to the VTK object. + remove_interior_faces : bool, default: False + Remove interior faces. Returns ------- @@ -405,11 +409,16 @@ def _to_vtk(self, add_cells: bool = True, add_faces: bool = False) -> pv.Unstruc if add_faces: # add faces. + face_zones = self.face_zones + + if remove_interior_faces: + face_zones = [fz for fz in face_zones if "interior" not in fz.name] + grid_faces = pv.UnstructuredGrid() grid_faces.nodes = self.nodes - face_zone_ids = np.concatenate([[fz.id] * fz.faces.shape[0] for fz in self.face_zones]) - faces = np.array(np.concatenate([fz.faces for fz in self.face_zones]), dtype=int) + face_zone_ids = np.concatenate([[fz.id] * fz.faces.shape[0] for fz in face_zones]) + faces = np.array(np.concatenate([fz.faces for fz in face_zones]), dtype=int) faces = np.hstack([np.ones((faces.shape[0], 1), dtype=int) * 3, faces]) grid_faces = pv.UnstructuredGrid(