Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate the parallel keyword and infer parallelization from n_proc > 1 #8

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/examples/get_emission_ang.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import matplotlib.pyplot as plt
import numpy as np
from astropy.time import Time

from zodipy import Zodipy

model = Zodipy("dirbe")
Expand Down
6 changes: 3 additions & 3 deletions docs/examples/get_parallel_emission.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import multiprocessing
import time

import astropy.units as u
Expand All @@ -11,9 +10,10 @@
nside = 256
pixels = np.arange(hp.nside2npix(nside))
obs_time = Time("2020-01-01")
n_proc = 8

model = Zodipy()
model_parallel = Zodipy(parallel=True)
model_parallel = Zodipy(n_proc=n_proc)

start = time.perf_counter()
emission = model.get_binned_emission_pix(
Expand All @@ -33,7 +33,7 @@
obs_time=obs_time,
)
print(
f"Time spent on {multiprocessing.cpu_count()} CPUs:",
f"Time spent on {n_proc} CPUs:",
round(time.perf_counter() - start, 2),
"seconds",
)
Expand Down
4 changes: 2 additions & 2 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ read [Cosmoglobe: Simulating Zodiacal Emission with ZodiPy](https://arxiv.org/ab


## Parallelization
If you are not using ZodiPy in an already parallelized environment **and** are working with large pointing sequences, setting `parallel=True` when initializing `Zodipy` will improve the performance. ZodiPy will then automatically distribute the pointing to all available CPU's, given by `multiprocessing.cpu_count()` or to `n_proc` if this argument is provided.
If you are not using ZodiPy in an already parallelized environment, you may specify the number of cores used by ZodiPy through the `n_proc` keyword. By default `n_proc` is set to 1. For values of `n_proc` > 1, the line-of-sight calculations are parallelized using the `multiprocessing` module.

```python hl_lines="15 16"
{!examples/get_parallel_emission.py!}
Expand All @@ -102,7 +102,7 @@ If you are not using ZodiPy in an already parallelized environment **and** are w
!!! warning "Using ZodiPy in parallelized environments"
If ZodiPy is used in a parallelized environment one may have to specifically set the environment variable
`OMP_NUM_THREADS=1` to avoid oversubscription. This is due automatic parallelization in third party libraries such as `healpy` where for instance the `hp.Rotator` object automatically parallelizes rotation of unit vectors.
This means that when using ZodiPy with pointing in a coordinate system other than ecliptic, even if `Zodipy` is initialized with `parallel=False`, `healpy` will under the hood automatically distribute the pointing to available CPU's.
This means that when using ZodiPy with pointing in a coordinate system other than ecliptic, even if `Zodipy` is initialized with `n_proc`=1, `healpy` will under the hood automatically distribute the pointing to available CPU's.


## Visualizing the interplanetary dust distribution of a model
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ ignore = [
"PLR0913",
"ISC001"
]
exclude = ["tests/*"]
exclude = ["tests/*", "docs/*"]

[tool.ruff.pydocstyle]
convention = "google"
2 changes: 1 addition & 1 deletion tests/_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,5 @@ def model(draw: DrawFn, **static_params: dict[str, Any]) -> zodipy.Zodipy:
strategies.pop(key)

return draw(
builds(partial(zodipy.Zodipy, parallel=False, **static_params), **strategies)
builds(partial(zodipy.Zodipy, **static_params), **strategies)
)
34 changes: 31 additions & 3 deletions tests/test_get_emission.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,14 @@ def test_invalid_pixel(
obs=observer,
)


def test_multiprocessing() -> None:
"""
Testing that model with multiprocessing enabled returns the same value as
without multiprocessing.
"""

model = Zodipy(parallel=False)
model_parallel = Zodipy(parallel=True)
model = Zodipy()
model_parallel = Zodipy(n_proc=4)

observer = "earth"
time = Time("2020-01-01")
Expand Down Expand Up @@ -331,6 +330,35 @@ def test_multiprocessing() -> None:

assert np.allclose(emission_binned_ang.value, emission_binned_ang_parallel.value)

def test_inner_radial_cutoff_multiprocessing() -> None:
"""
Testing that model with inner radial cutoffs can be parallelized.
"""

model = Zodipy("RRM-experimental")
model_parallel = Zodipy("RRM-experimental", n_proc=4)

observer = "earth"
time = Time("2020-01-01")
frequency = 78 * u.micron
nside = 32
pix = np.random.randint(0, hp.nside2npix(nside), size=1000)

emission_pix = model.get_emission_pix(
frequency,
pixels=pix,
nside=nside,
obs_time=time,
obs=observer,
)
emission_pix_parallel = model_parallel.get_emission_pix(
frequency,
pixels=pix,
nside=nside,
obs_time=time,
obs=observer,
)
assert np.allclose(emission_pix, emission_pix_parallel)

@given(
model(extrapolate=True),
Expand Down
26 changes: 11 additions & 15 deletions zodipy/zodipy.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ class Zodipy:
`solar_cutoff` are masked. Defaults to `None`.
solar_cut_fill_value (float): Fill value for pixels masked with `solar_cut`.
Defaults to `np.nan`.
parallel (bool): If `True`, input pointing will be split among several cores, and
the emission will be computed in parallel. Default is `False`.
n_proc (int): Number of cores to use when `parallel` is `True`. Defaults is `None`,
which uses all available cores.
n_proc (int): Number of cores to use. If `n_proc` is greater than 1, the line-of-sight
integrals are parallelized using the `multiprocessing` module. Defaults to 1.

"""

Expand All @@ -78,8 +76,7 @@ def __init__(
ephemeris: str = "de432s",
solar_cut: u.Quantity[u.deg] | None = None,
solar_cut_fill_value: float = np.nan,
parallel: bool = False,
n_proc: int | None = None,
n_proc: int = 1,
) -> None:
self.model = model
self.gauss_quad_degree = gauss_quad_degree
Expand All @@ -88,7 +85,6 @@ def __init__(
self.ephemeris = ephemeris
self.solar_cut = solar_cut.to(u.rad) if solar_cut is not None else solar_cut
self.solar_cut_fill_value = solar_cut_fill_value
self.parallel = parallel
self.n_proc = n_proc

self._interpolator = partial(
Expand Down Expand Up @@ -450,18 +446,18 @@ def _compute_emission(
**source_parameters["common"],
)

if self.parallel:
n_proc = multiprocessing.cpu_count() if self.n_proc is None else self.n_proc

unit_vector_chunks = np.array_split(unit_vectors, n_proc, axis=-1)
if self.n_proc > 1:
unit_vector_chunks = np.array_split(unit_vectors, self.n_proc, axis=-1)
integrated_comp_emission = np.zeros((len(self._ipd_model.comps), unit_vectors.shape[1]))
with multiprocessing.get_context(SYS_PROC_START_METHOD).Pool(processes=n_proc) as pool:
with multiprocessing.get_context(SYS_PROC_START_METHOD).Pool(
processes=self.n_proc
) as pool:
for idx, comp_label in enumerate(self._ipd_model.comps.keys()):
stop_chunks = np.array_split(stop[comp_label], n_proc, axis=-1)
stop_chunks = np.array_split(stop[comp_label], self.n_proc, axis=-1)
if start[comp_label].size == 1:
start_chunks = [start[comp_label]] * n_proc
start_chunks = [start[comp_label]] * self.n_proc
else:
start_chunks = np.array_split(start[comp_label], n_proc, axis=-1)
start_chunks = np.array_split(start[comp_label], self.n_proc, axis=-1)
comp_integrands = [
partial(
common_integrand,
Expand Down
Loading