From 8caf750e55c2d89082de5c57eef1300407bac1e0 Mon Sep 17 00:00:00 2001 From: drodarie Date: Wed, 14 Jan 2026 16:51:35 +0100 Subject: [PATCH 1/7] fix: Replace MPI barrier with broadcast of boot status so that each process raises an error. --- packages/bsb-core/bsb/config/_attrs.py | 12 +++++++++--- .../bsb-core/bsb/morphologies/selector.py | 15 +++++++++------ packages/bsb-core/tests/test_issues.py | 19 ++++++++++++++++++- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/packages/bsb-core/bsb/config/_attrs.py b/packages/bsb-core/bsb/config/_attrs.py index 107b9c48..6abecc5b 100644 --- a/packages/bsb-core/bsb/config/_attrs.py +++ b/packages/bsb-core/bsb/config/_attrs.py @@ -462,6 +462,7 @@ def _root_is_booted(obj): def _boot_nodes(top_node, scaffold): + fail_boot = False for node in walk_nodes(top_node): node.scaffold = scaffold # Boot attributes @@ -476,9 +477,14 @@ def _boot_nodes(top_node, scaffold): try: run_hook(node, "boot") except Exception as e: - errr.wrap(BootError, e, prepend=f"Failed to boot {node}:") - # fixme: why is this here? Will deadlock in case of BootError on specific node only. - scaffold._comm.barrier() + fail_boot = [e, f"Failed to boot {node}:"] + fail_boot = scaffold._comm.allgather(fail_boot) + if any(fail_boot): + if fail_boot[scaffold._comm.get_rank()]: + e, prepend = fail_boot[scaffold._comm.get_rank()] + errr.wrap(BootError, e, prepend=prepend) + else: + raise BootError("Boot failed on other process.") def _unset_nodes(top_node): diff --git a/packages/bsb-core/bsb/morphologies/selector.py b/packages/bsb-core/bsb/morphologies/selector.py index a00d4343..d62c4254 100644 --- a/packages/bsb-core/bsb/morphologies/selector.py +++ b/packages/bsb-core/bsb/morphologies/selector.py @@ -87,15 +87,18 @@ class NeuroMorphoSelector(NameSelector, classmap_entry="from_neuromorpho"): _files = "dableFiles/" def __boot__(self): + fail_to_boot = False if self.scaffold.is_main_process(): try: morphos = self._scrape_nm(self.names) - except: - self.scaffold._comm.barrier() - raise - for name, morpho in morphos.items(): - self.scaffold.morphologies.save(name, morpho, overwrite=True) - self.scaffold._comm.barrier() + except Exception: + fail_to_boot = True + else: + for name, morpho in morphos.items(): + self.scaffold.morphologies.save(name, morpho, overwrite=True) + self.scaffold._comm.bcast(fail_to_boot) + if fail_to_boot: + raise @classmethod def _swc_url(cls, archive, name): diff --git a/packages/bsb-core/tests/test_issues.py b/packages/bsb-core/tests/test_issues.py index 97830ccc..95f01a1c 100644 --- a/packages/bsb-core/tests/test_issues.py +++ b/packages/bsb-core/tests/test_issues.py @@ -2,13 +2,18 @@ import unittest from types import NoneType +from bsb_test import RandomStorageFixture, timeout + from bsb import ( + BootError, CellType, CfgReferenceError, Chunk, + Configuration, FixedPositions, PlacementIndications, Reference, + Scaffold, config, ) @@ -45,7 +50,7 @@ class Root430: extensions = config.dict(type=Extension, required=True) -class TestIssues(unittest.TestCase): +class TestIssues(RandomStorageFixture, unittest.TestCase, engine_name="hdf5"): def test_430(self): with self.assertRaises(CfgReferenceError, msg="Regression of issue #430"): _config = Root430( @@ -62,3 +67,15 @@ def test_802(self): Chunk((0, 0, 0), (100, 100, 100)), {CellType(spatial=dict(radius=1, count=1)): PlacementIndications()}, ) + + @timeout(3) + def test_211(self): + """ + Test if the absence of a file does not make the reconstruction + get stuck in parallel. + """ + cfg = Configuration.default( + files=dict(annotations={"file": "path/to/missing/file.nrrd", "type": "nrrd"}), + ) + with self.assertRaises(BootError): + Scaffold(cfg, self.storage) From 7ffbefba8fa49c268ba70ab28571227c8eed0ca1 Mon Sep 17 00:00:00 2001 From: drodarie Date: Wed, 14 Jan 2026 17:01:54 +0100 Subject: [PATCH 2/7] fix: cleanup morphologies obtained from NeuromorphoSelector upon unsetting the node. --- packages/bsb-core/bsb/config/_attrs.py | 2 +- packages/bsb-core/bsb/morphologies/selector.py | 11 ++++++++++- packages/bsb-core/tests/test_selectors.py | 3 +-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/packages/bsb-core/bsb/config/_attrs.py b/packages/bsb-core/bsb/config/_attrs.py index 6abecc5b..c1a7db67 100644 --- a/packages/bsb-core/bsb/config/_attrs.py +++ b/packages/bsb-core/bsb/config/_attrs.py @@ -489,13 +489,13 @@ def _boot_nodes(top_node, scaffold): def _unset_nodes(top_node): for node in walk_nodes(top_node): + run_hook(node, "unboot") with contextlib.suppress(Exception): del node.scaffold node._config_parent = None node._config_key = None if hasattr(node, "_config_index"): node._config_index = None - run_hook(node, "unboot") class ConfigurationAttribute: diff --git a/packages/bsb-core/bsb/morphologies/selector.py b/packages/bsb-core/bsb/morphologies/selector.py index d62c4254..2c4c8fc6 100644 --- a/packages/bsb-core/bsb/morphologies/selector.py +++ b/packages/bsb-core/bsb/morphologies/selector.py @@ -1,5 +1,6 @@ import abc import concurrent +import contextlib import re import tempfile import typing @@ -12,7 +13,7 @@ from .. import config from ..config import types from ..config._attrs import cfglist -from ..exceptions import MissingMorphologyError, SelectorError +from ..exceptions import MissingMorphologyError, MorphologyRepositoryError, SelectorError from .parsers import parse_morphology_file if typing.TYPE_CHECKING: # pragma: nocover @@ -100,6 +101,14 @@ def __boot__(self): if fail_to_boot: raise + def __unboot__(self): + if self.scaffold.is_main_process(): + for name in self.names: + with contextlib.suppress(MorphologyRepositoryError): + # remove morphology if it was saved in the scaffold. + self.scaffold.morphologies.remove(name) + self.scaffold._comm.barrier() + @classmethod def _swc_url(cls, archive, name): return ( diff --git a/packages/bsb-core/tests/test_selectors.py b/packages/bsb-core/tests/test_selectors.py index b4612eb3..1d7c73a9 100644 --- a/packages/bsb-core/tests/test_selectors.py +++ b/packages/bsb-core/tests/test_selectors.py @@ -1,6 +1,6 @@ import unittest -from bsb_test import RandomStorageFixture, skip_parallel, skipIfOffline +from bsb_test import RandomStorageFixture, skipIfOffline from bsb import ( MPI, @@ -105,7 +105,6 @@ def test_nm_selector(self): m = s.morphologies.select(*ct.spatial.morphologies)[0] self.assertEqual(name, m.get_meta()["neuron_name"], "meta not stored") - @skip_parallel # https://github.com/dbbs-lab/bsb/issues/187 @skipIfOffline(scheme=NeuroMorphoScheme()) def test_nm_selector_wrong_name(self): ct = CellType( From f8041ac3f186c3380d5cedd28e4bbb115b19f0c4 Mon Sep 17 00:00:00 2001 From: drodarie Date: Wed, 14 Jan 2026 17:11:35 +0100 Subject: [PATCH 3/7] fix: allow again test_nm_scheme in parallel --- packages/bsb-core/tests/test_util.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/bsb-core/tests/test_util.py b/packages/bsb-core/tests/test_util.py index fbf0ee7d..e4c538d1 100644 --- a/packages/bsb-core/tests/test_util.py +++ b/packages/bsb-core/tests/test_util.py @@ -5,7 +5,6 @@ FixedPosConfigFixture, NumpyTestCase, RandomStorageFixture, - skip_parallel, skipIfOffline, ) from scipy.spatial.transform import Rotation @@ -70,7 +69,6 @@ def test_rotation_matrix_from_vectors(self): class TestUriSchemes(RandomStorageFixture, unittest.TestCase, engine_name="fs"): - @skip_parallel # see https://github.com/dbbs-lab/bsb/issues/197 @skipIfOffline(scheme=NeuroMorphoScheme()) def test_nm_scheme(self): file = FileDependency( From d7bbb6d2ef77c56e924122d94b4630d0f9b07aca Mon Sep 17 00:00:00 2001 From: drodarie Date: Wed, 14 Jan 2026 17:45:12 +0100 Subject: [PATCH 4/7] fix: remove old codecov badge in bsb-core README.md --- packages/bsb-core/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/bsb-core/README.md b/packages/bsb-core/README.md index 1f1725e7..7cb0e521 100644 --- a/packages/bsb-core/README.md +++ b/packages/bsb-core/README.md @@ -1,7 +1,6 @@ [![Build Status](https://github.com/dbbs-lab/bsb/actions/workflows/main.yml/badge.svg)](https://github.com/dbbs-lab/bsb/actions/workflows/main.yml) [![Documentation](https://readthedocs.org/projects/bsb-core/badge/?version=latest)](https://bsb-core.readthedocs.io/en/latest/?badge=latest) [![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) -[![codecov](https://codecov.io/gh/dbbs-lab/bsb-core/branch/main/graph/badge.svg)](https://codecov.io/gh/dbbs-lab/bsb-core) # bsb-core From 97fb9f798f63baafaa787a7aa0a3b2d459bba97a Mon Sep 17 00:00:00 2001 From: drodarie Date: Wed, 14 Jan 2026 19:23:06 +0100 Subject: [PATCH 5/7] fix: Use ContextManager to deal with error broadcast. Add to MPIService functions --- packages/bsb-core/bsb/config/_attrs.py | 17 ++----- .../bsb-core/bsb/morphologies/selector.py | 21 ++++----- packages/bsb-core/bsb/services/mpi.py | 47 +++++++++++++++++++ 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/packages/bsb-core/bsb/config/_attrs.py b/packages/bsb-core/bsb/config/_attrs.py index c1a7db67..2cf31321 100644 --- a/packages/bsb-core/bsb/config/_attrs.py +++ b/packages/bsb-core/bsb/config/_attrs.py @@ -462,7 +462,6 @@ def _root_is_booted(obj): def _boot_nodes(top_node, scaffold): - fail_boot = False for node in walk_nodes(top_node): node.scaffold = scaffold # Boot attributes @@ -474,17 +473,11 @@ def _boot_nodes(top_node, scaffold): boot(node, scaffold) booted.add(boot) # Boot node hook - try: - run_hook(node, "boot") - except Exception as e: - fail_boot = [e, f"Failed to boot {node}:"] - fail_boot = scaffold._comm.allgather(fail_boot) - if any(fail_boot): - if fail_boot[scaffold._comm.get_rank()]: - e, prepend = fail_boot[scaffold._comm.get_rank()] - errr.wrap(BootError, e, prepend=prepend) - else: - raise BootError("Boot failed on other process.") + with scaffold._comm.try_all(BootError("Boot failed on other process.")): + try: + run_hook(node, "boot") + except Exception as e: + errr.wrap(BootError, e, prepend=f"Failed to boot {node}:") def _unset_nodes(top_node): diff --git a/packages/bsb-core/bsb/morphologies/selector.py b/packages/bsb-core/bsb/morphologies/selector.py index 2c4c8fc6..87404b72 100644 --- a/packages/bsb-core/bsb/morphologies/selector.py +++ b/packages/bsb-core/bsb/morphologies/selector.py @@ -88,18 +88,15 @@ class NeuroMorphoSelector(NameSelector, classmap_entry="from_neuromorpho"): _files = "dableFiles/" def __boot__(self): - fail_to_boot = False - if self.scaffold.is_main_process(): - try: - morphos = self._scrape_nm(self.names) - except Exception: - fail_to_boot = True - else: - for name, morpho in morphos.items(): - self.scaffold.morphologies.save(name, morpho, overwrite=True) - self.scaffold._comm.bcast(fail_to_boot) - if fail_to_boot: - raise + with self.scaffold._comm.try_main(): + if self.scaffold.is_main_process(): + try: + morphos = self._scrape_nm(self.names) + except Exception: + raise + else: + for name, morpho in morphos.items(): + self.scaffold.morphologies.save(name, morpho, overwrite=True) def __unboot__(self): if self.scaffold.is_main_process(): diff --git a/packages/bsb-core/bsb/services/mpi.py b/packages/bsb-core/bsb/services/mpi.py index 09c88940..bcda38f5 100644 --- a/packages/bsb-core/bsb/services/mpi.py +++ b/packages/bsb-core/bsb/services/mpi.py @@ -1,5 +1,6 @@ import functools import os +from contextlib import AbstractContextManager from ..exceptions import DependencyError from ._util import MockModule @@ -74,6 +75,52 @@ def Unlock(self, rank): return WindowMock() + def try_all(self, default_exception): + """ + Create a context manager that checks if any exception is raised by any processes + within the context, and make all other processes raise an exception in that case + + :param Exception default_exception: Exception instance to raise for all processes + that did not raise during the context. + :return: context manager + """ + comm = self + + class bcast_all(AbstractContextManager): + def __enter__(self): + pass + + def __exit__(self, exctype, excinst, exctb): + exceptions = comm.allgather(excinst) + if any(exceptions): + if exceptions[comm.get_rank()]: + raise exceptions[comm.get_rank()] + else: + raise default_exception + + return bcast_all() + + def try_main(self): + """ + Create a context manager that checks if any exception is raised by the main + process within the context, and make all other processes raise this exception in + that case + + :return: context manager + """ + comm = self + + class bcast(AbstractContextManager): + def __enter__(self): + pass + + def __exit__(self, exctype, excinst, exctb): + exception = comm.bcast(excinst) + if exception is not None: + raise exception + + return bcast() + class MPIModule(MockModule): """ From 088262b026d8f21a2f2a21521e28b8ce57784ba8 Mon Sep 17 00:00:00 2001 From: drodarie Date: Thu, 15 Jan 2026 12:41:20 +0100 Subject: [PATCH 6/7] fix: Use decorator instead of class for ContextManager functions --- packages/bsb-core/bsb/config/_attrs.py | 2 +- .../bsb-core/bsb/morphologies/selector.py | 22 +++---- packages/bsb-core/bsb/services/mpi.py | 64 ++++++++++--------- 3 files changed, 44 insertions(+), 44 deletions(-) diff --git a/packages/bsb-core/bsb/config/_attrs.py b/packages/bsb-core/bsb/config/_attrs.py index 2cf31321..6d977ae3 100644 --- a/packages/bsb-core/bsb/config/_attrs.py +++ b/packages/bsb-core/bsb/config/_attrs.py @@ -473,7 +473,7 @@ def _boot_nodes(top_node, scaffold): boot(node, scaffold) booted.add(boot) # Boot node hook - with scaffold._comm.try_all(BootError("Boot failed on other process.")): + with scaffold._comm.try_all(BootError("Boot failed on different rank.")): try: run_hook(node, "boot") except Exception as e: diff --git a/packages/bsb-core/bsb/morphologies/selector.py b/packages/bsb-core/bsb/morphologies/selector.py index 87404b72..6c922593 100644 --- a/packages/bsb-core/bsb/morphologies/selector.py +++ b/packages/bsb-core/bsb/morphologies/selector.py @@ -90,21 +90,17 @@ class NeuroMorphoSelector(NameSelector, classmap_entry="from_neuromorpho"): def __boot__(self): with self.scaffold._comm.try_main(): if self.scaffold.is_main_process(): - try: - morphos = self._scrape_nm(self.names) - except Exception: - raise - else: - for name, morpho in morphos.items(): - self.scaffold.morphologies.save(name, morpho, overwrite=True) + morphos = self._scrape_nm(self.names) + for name, morpho in morphos.items(): + self.scaffold.morphologies.save(name, morpho, overwrite=True) def __unboot__(self): - if self.scaffold.is_main_process(): - for name in self.names: - with contextlib.suppress(MorphologyRepositoryError): - # remove morphology if it was saved in the scaffold. - self.scaffold.morphologies.remove(name) - self.scaffold._comm.barrier() + with self.scaffold._comm.try_main(): + if self.scaffold.is_main_process(): + for name in self.names: + with contextlib.suppress(MorphologyRepositoryError): + # remove morphology if it was saved in the scaffold. + self.scaffold.morphologies.remove(name) @classmethod def _swc_url(cls, archive, name): diff --git a/packages/bsb-core/bsb/services/mpi.py b/packages/bsb-core/bsb/services/mpi.py index bcda38f5..635ba5cc 100644 --- a/packages/bsb-core/bsb/services/mpi.py +++ b/packages/bsb-core/bsb/services/mpi.py @@ -1,6 +1,6 @@ +import contextlib import functools import os -from contextlib import AbstractContextManager from ..exceptions import DependencyError from ._util import MockModule @@ -75,7 +75,8 @@ def Unlock(self, rank): return WindowMock() - def try_all(self, default_exception): + @contextlib.contextmanager + def try_all(self, default_exception=None): """ Create a context manager that checks if any exception is raised by any processes within the context, and make all other processes raise an exception in that case @@ -84,42 +85,45 @@ def try_all(self, default_exception): that did not raise during the context. :return: context manager """ - comm = self - - class bcast_all(AbstractContextManager): - def __enter__(self): - pass - - def __exit__(self, exctype, excinst, exctb): - exceptions = comm.allgather(excinst) - if any(exceptions): - if exceptions[comm.get_rank()]: - raise exceptions[comm.get_rank()] - else: - raise default_exception - - return bcast_all() - + exc_instance = None + default_exception = default_exception or RuntimeError( + "An error occurred on a different rank" + ) + try: + yield + except Exception as e: + exc_instance = e + finally: + exceptions = self.allgather(exc_instance) + if any(exceptions): + raise ( + exceptions[self.get_rank()] + if exceptions[self.get_rank()] + else default_exception + ) + + @contextlib.contextmanager def try_main(self): """ Create a context manager that checks if any exception is raised by the main process within the context, and make all other processes raise this exception in that case + Warning: All processes will still enter the context, but only main exception will + be raised. :return: context manager """ - comm = self - - class bcast(AbstractContextManager): - def __enter__(self): - pass - - def __exit__(self, exctype, excinst, exctb): - exception = comm.bcast(excinst) - if exception is not None: - raise exception - - return bcast() + exc_instance = None + try: + # All processes have to enter the context + # contextlib will throw an error if one does not yield + yield + except Exception as e: + exc_instance = e + finally: + exception = self.bcast(exc_instance) + if exception is not None: + raise exception class MPIModule(MockModule): From 413a0b43d6ae84e48ef4451a9aa9b2ec170a2462 Mon Sep 17 00:00:00 2001 From: drodarie Date: Mon, 19 Jan 2026 13:07:57 +0100 Subject: [PATCH 7/7] fix: implement review feedback --- packages/bsb-core/bsb/services/mpi.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/bsb-core/bsb/services/mpi.py b/packages/bsb-core/bsb/services/mpi.py index 635ba5cc..22e758cd 100644 --- a/packages/bsb-core/bsb/services/mpi.py +++ b/packages/bsb-core/bsb/services/mpi.py @@ -93,14 +93,14 @@ def try_all(self, default_exception=None): yield except Exception as e: exc_instance = e - finally: - exceptions = self.allgather(exc_instance) - if any(exceptions): - raise ( - exceptions[self.get_rank()] - if exceptions[self.get_rank()] - else default_exception - ) + + exceptions = self.allgather(exc_instance) + if any(exceptions): + raise ( + exceptions[self.get_rank()] + if exceptions[self.get_rank()] + else default_exception + ) @contextlib.contextmanager def try_main(self): @@ -120,10 +120,10 @@ def try_main(self): yield except Exception as e: exc_instance = e - finally: - exception = self.bcast(exc_instance) - if exception is not None: - raise exception + + exception = self.bcast(exc_instance) + if exception is not None: + raise exception class MPIModule(MockModule):