Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
e510c02
Add a failing test (using MPI)
ziotom78 Mar 14, 2025
1d284dd
Add a method `_GridCommClass.is_this_process_in_grid()`
ziotom78 Mar 14, 2025
f2599ef
Fix Observations._set_attributes_from_list_of_dict()
ziotom78 Mar 14, 2025
f07cc8c
Use `Observation.no_mueller_hwp()`
ziotom78 Mar 14, 2025
41666d7
Use `Observation.no_mueller_hwp()`
ziotom78 Mar 14, 2025
832f964
Be sure that each observation has a name
ziotom78 Mar 14, 2025
8a98566
Fix spelling error in comment
ziotom78 Mar 14, 2025
44294cf
Remove unused whitespace
ziotom78 Mar 14, 2025
6099305
Fix bugs in the way the map-makers use the MPI grid
ziotom78 Mar 14, 2025
1df9369
Run MPI tests with 1, 2, 3 processes
ziotom78 Mar 14, 2025
f179d02
Use --oversubscribe with OpenMPI
ziotom78 Mar 15, 2025
dd4d052
Merge branch 'master' into fix_364
ziotom78 Mar 15, 2025
1db3aac
fixed destriper to make it use the MPI grid communicator
anand-avinash Mar 19, 2025
2954621
added hwp to test_mpi.py::test_empty_observations
anand-avinash Mar 19, 2025
a1926c3
Merge changes from #365 (Avinash’s PR)
ziotom78 Mar 19, 2025
bb293b1
Improve the logic behind the set up of Observation.mueller_hwp
ziotom78 Mar 19, 2025
5794b02
Avoid using pytest
ziotom78 Mar 19, 2025
c3c3987
Do not use a try…finally block with a MPI barrier in it
ziotom78 Mar 19, 2025
99179fc
Avoid printing stuff to video
ziotom78 Mar 19, 2025
150f395
fixed make_binned_map_splits for the case when maps have to be writte…
anand-avinash Mar 24, 2025
dcc8405
Merge branch 'master' into fix_364
ziotom78 Apr 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
- name: Tests MPICH
if: ${{ matrix.mpi == 'mpich' }}
run: |
for proc in 1 5 9 ; do
for proc in 1 2 3; do
echo "Running MPI test ($MPI) with $proc processes"
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_mpi.py
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_detector_blocks.py
Expand All @@ -93,8 +93,8 @@ jobs:
- name: Tests OpenMPI
if: ${{ matrix.mpi == 'openmpi' }}
run: |
for proc in 1 2 ; do
for proc in 1 2 3; do
echo "Running MPI test ($MPI) with $proc processes"
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_mpi.py
PYTHONPATH=. mpiexec -n $proc python3 ./test/test_detector_blocks.py
PYTHONPATH=. mpiexec --oversubscribe -n $proc python3 ./test/test_mpi.py
PYTHONPATH=. mpiexec --oversubscribe -n $proc python3 ./test/test_detector_blocks.py
done
2 changes: 1 addition & 1 deletion litebird_sim/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ def write_list_of_observations(
observations = [observations]
except IndexError:
# Empty list
# We do not want to return here, as we still need to participate to
# We do not want to return here, as we still need to participate in
# the call to _compute_global_start_index below
observations = [] # type: List[Observation]

Expand Down
32 changes: 19 additions & 13 deletions litebird_sim/mapmaking/binner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,23 @@
# functions and variable defined here use the same letters and symbols of that
# paper. We refer to it in code comments and docstrings as "KurkiSuonio2009".

import logging
from dataclasses import dataclass
from typing import Union, List, Any, Optional, Callable

import healpy as hp
import numpy as np
import numpy.typing as npt
from ducc0.healpix import Healpix_Base
from numba import njit
import healpy as hp

from typing import Union, List, Any, Optional, Callable
from litebird_sim.observations import Observation
from litebird_sim.coordinates import CoordinateSystem
from litebird_sim.pointings import get_hwp_angle
from litebird_sim.hwp import HWP
from litebird_sim import mpi
from ducc0.healpix import Healpix_Base
from litebird_sim.coordinates import CoordinateSystem
from litebird_sim.healpix import nside_to_npix

import logging

from litebird_sim.hwp import HWP
from litebird_sim.mpi import MPI_COMM_GRID
from litebird_sim.observations import Observation
from litebird_sim.pointings import get_hwp_angle
from .common import (
_compute_pixel_indices,
_normalize_observations_and_pointings,
Expand Down Expand Up @@ -263,7 +262,9 @@ def _build_nobs_matrix(
for i in range(len(obs_list) - 1)
]
):
nobs_matrix = obs_list[0].comm.allreduce(nobs_matrix, mpi.MPI.SUM)
nobs_matrix = mpi.MPI_COMM_GRID.COMM_OBS_GRID.allreduce(
nobs_matrix, mpi.MPI.SUM
)
else:
raise NotImplementedError(
"All observations must be distributed over the same MPI groups"
Expand All @@ -282,7 +283,7 @@ def make_binned_map(
detector_split: str = "full",
time_split: str = "full",
pointings_dtype=np.float64,
) -> BinnerResult:
) -> Optional[BinnerResult]:
"""Bin Map-maker

Map a list of observations
Expand Down Expand Up @@ -319,9 +320,14 @@ def make_binned_map(

Returns:
An instance of the class :class:`.MapMakerResult`. If the observations are
distributed over MPI Processes, all of them get a copy of the same object.
distributed over MPI Processes, all of them get a copy of the same object,
unless the current MPI process does not hold any TOD sample: in the latter
case, ``None`` is returned.
"""

if not MPI_COMM_GRID.is_this_process_in_grid():
return None

if not components:
components = ["tod"]

Expand Down
23 changes: 11 additions & 12 deletions litebird_sim/mapmaking/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from dataclasses import dataclass
from typing import Union, List, Tuple, Callable

import astropy.time
import numpy as np
import numpy.typing as npt
from numba import njit
import astropy.time

from ducc0.healpix import Healpix_Base
from numba import njit

from litebird_sim.coordinates import CoordinateSystem, rotate_coordinates_e2g
from litebird_sim.observations import Observation
Expand Down Expand Up @@ -109,15 +109,14 @@ def get_map_making_weights(
except AttributeError:
weights = np.ones(observations.n_detectors)

if check and MPI_COMM_GRID.COMM_OBS_GRID != MPI_COMM_GRID.COMM_NULL:
if check:
# Check that there are no weird weights
assert np.all(
np.isfinite(weights)
), f"Not all the detectors' weights are finite numbers: {weights}"
assert np.all(
weights > 0.0
), f"Not all the detectors' weights are positive: {weights}"
if check and MPI_COMM_GRID.is_this_process_in_grid():
# Check that there are no weird weights
assert np.all(
np.isfinite(weights)
), f"Not all the detectors' weights are finite numbers: {weights}"
assert np.all(
weights > 0.0
), f"Not all the detectors' weights are positive: {weights}"

return weights

Expand Down
37 changes: 19 additions & 18 deletions litebird_sim/mapmaking/destriper.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
# -*- encoding: utf-8 -*-
import gc
import logging
import time

# The implementation of the destriping algorithm provided here is based on the paper
# «Destriping CMB temperature and polarization maps» by Kurki-Suonio et al. 2009,
# A&A 506, 1511–1539 (2009), https://dx.doi.org/10.1051/0004-6361/200912361
#
# It is important to have that paper at hand while reading this code, as many
# functions and variable defined here use the same letters and symbols of that
# paper. We refer to it in code comments and docstrings as "KurkiSuonio2009".

from dataclasses import dataclass
import gc
from pathlib import Path
from typing import Callable, Union, List, Optional, Tuple, Any, Dict

import healpy as hp
import numpy as np
import numpy.typing as npt
from ducc0.healpix import Healpix_Base
from numba import njit, prange
import healpy as hp

from litebird_sim.mpi import MPI_ENABLED, MPI_COMM_WORLD, MPI_COMM_GRID
from typing import Callable, Union, List, Optional, Tuple, Any, Dict
from litebird_sim.coordinates import CoordinateSystem, coord_sys_to_healpix_string
from litebird_sim.hwp import HWP
from litebird_sim.mpi import MPI_ENABLED, MPI_COMM_WORLD, MPI_COMM_GRID
from litebird_sim.observations import Observation
from litebird_sim.pointings import get_hwp_angle
from litebird_sim.coordinates import CoordinateSystem, coord_sys_to_healpix_string

from .common import (
_compute_pixel_indices,
_normalize_observations_and_pointings,
Expand All @@ -39,6 +29,14 @@
_build_mask_time_split,
)

# The implementation of the destriping algorithm provided here is based on the paper
# «Destriping CMB temperature and polarization maps» by Kurki-Suonio et al. 2009,
# A&A 506, 1511–1539 (2009), https://dx.doi.org/10.1051/0004-6361/200912361
#
# It is important to have that paper at hand while reading this code, as many
# functions and variable defined here use the same letters and symbols of that
# paper. We refer to it in code comments and docstrings as "KurkiSuonio2009".

if MPI_ENABLED:
import mpi4py.MPI

Expand Down Expand Up @@ -501,7 +499,7 @@ def _build_nobs_matrix(
)

# Now we must accumulate the result of every MPI process
if MPI_ENABLED and MPI_COMM_GRID.COMM_OBS_GRID != MPI_COMM_GRID.COMM_NULL:
if MPI_ENABLED and MPI_COMM_GRID.is_this_process_in_grid():
MPI_COMM_GRID.COMM_OBS_GRID.Allreduce(
mpi4py.MPI.IN_PLACE, nobs_matrix, op=mpi4py.MPI.SUM
)
Expand Down Expand Up @@ -750,7 +748,7 @@ def _compute_binned_map(
baseline_lengths=cur_baseline_lengths,
)

if MPI_ENABLED:
if MPI_ENABLED and MPI_COMM_GRID.is_this_process_in_grid():
MPI_COMM_GRID.COMM_OBS_GRID.Allreduce(
mpi4py.MPI.IN_PLACE, output_sky_map, op=mpi4py.MPI.SUM
)
Expand Down Expand Up @@ -995,7 +993,7 @@ def _mpi_dot(a: List[npt.ArrayLike], b: List[npt.ArrayLike]) -> float:
# we call “flatten” to make them 1D and produce *one* scalar out of
# the dot product
local_result = sum([np.dot(x1.flatten(), x2.flatten()) for (x1, x2) in zip(a, b)])
if MPI_ENABLED:
if MPI_ENABLED and MPI_COMM_GRID.is_this_process_in_grid():
return MPI_COMM_GRID.COMM_OBS_GRID.allreduce(local_result, op=mpi4py.MPI.SUM)
else:
return local_result
Expand Down Expand Up @@ -1575,6 +1573,9 @@ def my_gui_callback(
"""
elapsed_time_s = time.monotonic()

if not MPI_COMM_GRID.is_this_process_in_grid():
return None

if not components:
components = ["tod"]

Expand Down
9 changes: 9 additions & 0 deletions litebird_sim/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ def _set_comm_obs_grid(self, comm_obs_grid):
def _set_null_comm(self, comm_null):
self._MPI_COMM_NULL = comm_null

def is_this_process_in_grid(self) -> bool:
"""
Return ``True`` if the current MPI process is in the MPI grid.

If the function returns ``False``, then the current MPI process is not handling
any TOD sample.
"""
return self._MPI_COMM_OBS_GRID != self._MPI_COMM_NULL


#: Global variable equal either to `mpi4py.MPI.COMM_WORLD` or a object
#: that defines the member variables `rank = 0` and `size = 1`.
Expand Down
72 changes: 46 additions & 26 deletions litebird_sim/observations.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
# -*- encoding: utf-8 -*-

from dataclasses import dataclass
from typing import Union, List, Any, Optional
import numbers
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Union, List, Any, Optional

import astropy.time
import numpy as np
import numpy.typing as npt

from collections import defaultdict

from .coordinates import DEFAULT_TIME_SCALE
from .distribute import distribute_evenly, distribute_detector_blocks
from .detectors import DetectorInfo
from .distribute import distribute_evenly, distribute_detector_blocks
from .mpi import MPI_COMM_GRID, _SerialMpiCommunicator


Expand All @@ -39,6 +38,14 @@ class TodDescription:
description: str


def _smart_list_to_numpy_array(lst: list) -> Union[list, npt.NDArray]:
# Anything in the form [None, None, …, None] is *not* converted to a NumPy array
if isinstance(lst, list) and all(x is None for x in lst):
return lst

return np.array(lst)


class Observation:
"""An observation made by one or multiple detectors over some time window

Expand Down Expand Up @@ -241,40 +248,50 @@ def _get_local_start_time_start_and_n_samples(self):

return self.start_time_global + start * delta, start, num

def _set_attributes_from_list_of_dict(self, list_of_dict, root):
def _set_attributes_from_list_of_dict(
self, list_of_dict: List[Dict[str, str]], root: int
) -> None:
"""
Take a list of dictionaries describing each detector and propagate them
"""
np.testing.assert_equal(len(list_of_dict), self.n_detectors_global)

# Turn list of dict into dict of arrays
if not self.comm or self.comm.rank == root:
# Build a list of all the keys in the dictionaries contained within
# `list_of_dict` (which is a *list* of dictionaries)
# `list_of_dict` (which is a *list* of dictionaries). `keys` is a list of
# strings like `name`, `net_ukrts`, `fknee_mhz`, etc.
keys = list(set().union(*list_of_dict) - set(dir(self)))

# This will be the dictionary associating each key with the
# *array* of value for that dictionary
dict_of_array = {k: [] for k in keys}
# *array* of values for that dictionary
dict_of_array = {cur_key: [] for cur_key in keys}

# This array associates either np.nan or None to each type;
# the former indicates that the value is a NumPy array, while
# None is used for everything else
nan_or_none = {}
for k in keys:
for d in list_of_dict:
if k in d:
for cur_key in keys:
for cur_det_dict in list_of_dict:
if cur_key in cur_det_dict:
try:
nan_or_none[k] = np.nan * d[k]
nan_or_none[cur_key] = np.nan * cur_det_dict[cur_key]
except TypeError:
nan_or_none[k] = None
nan_or_none[cur_key] = None
break

# Finally, build `dict_of_array`
for d in list_of_dict:
for k in keys:
dict_of_array[k].append(d.get(k, nan_or_none[k]))
for cur_det_dict in list_of_dict:
for cur_key in keys:
dict_of_array[cur_key].append(
cur_det_dict.get(cur_key, nan_or_none[cur_key])
)

# Why should this code iterate over `keys`?!?
for k in keys:
dict_of_array = {k: np.array(dict_of_array[k]) for k in keys}
# So far, dict_of_array entries are plain lists. This converts them into NumPy arrays
dict_of_array = {
cur_key: _smart_list_to_numpy_array(dict_of_array[cur_key])
for cur_key in keys
}
else:
keys = None
dict_of_array = {}
Expand All @@ -283,8 +300,8 @@ def _set_attributes_from_list_of_dict(self, list_of_dict, root):
if self.comm and self.comm.size > 1:
keys = self.comm.bcast(keys)

for k in keys:
self.setattr_det_global(k, dict_of_array.get(k), root)
for cur_key in keys:
self.setattr_det_global(cur_key, dict_of_array.get(cur_key), root)

@property
def n_samples_global(self):
Expand Down Expand Up @@ -681,12 +698,11 @@ def setattr_det_global(self, name, info, root=0):
setattr(self, name, info)
return

if (
MPI_COMM_GRID.COMM_OBS_GRID == MPI_COMM_GRID.COMM_NULL
): # The process does not own any detector (and TOD)
if not MPI_COMM_GRID.is_this_process_in_grid():
# The process does not own any detector (and TOD)
null_det = DetectorInfo()
attribute = getattr(null_det, name, None)
value = np.array([0]) if isinstance(attribute, numbers.Number) else [None]
value = np.array([0]) if isinstance(attribute, numbers.Number) else None
setattr(self, name, value)
return

Expand Down Expand Up @@ -927,6 +943,10 @@ def get_pointings(

return pointing_buffer, hwp_buffer

def no_mueller_hwp(self) -> bool:
"Return True if no detectors have defined a Mueller matrix for the HWP"
return (self.mueller_hwp is None) or all(m is None for m in self.mueller_hwp)

def _set_mpi_subcommunicators(self):
"""
This function splits the global MPI communicator into three kinds of
Expand Down
Loading
Loading