"""Facilities for mapping cosmological parameters between frameworks.
The module :mod:`mapping` provides facilities for mapping the cosmological
constants and functions used by one body of code to another.
Each supported body of code has its own dedicated class.
"""
import typing
import warnings
from typing import final
import cosmosis.datablock
import numpy as np
import numpy.typing as npt
from pyccl import physical_constants as physics
from typing_extensions import assert_never
from firecrown import parameters
from firecrown.ccl_factory import (
Background,
CCLCalculatorArgs,
PowerSpec,
PoweSpecAmplitudeParameter,
)
from firecrown.descriptors import TypeFloat, TypeString
from firecrown.likelihood.likelihood import NamedParameters
[docs]
def build_ccl_background_dict(
*,
a: npt.NDArray[np.float64],
chi: npt.NDArray[np.float64],
h_over_h0: npt.NDArray[np.float64],
) -> Background:
"""Builds the CCL dictionary of background quantities.
:param a: The scale factor array
:param chi: The comoving distance array
:param h_over_h0: The Hubble parameter divided by H0
:return: the dictionary of background quantities
"""
return {"a": a, "chi": chi, "h_over_h0": h_over_h0}
[docs]
class Mapping:
"""Base class defining the interface for each supported code.
The interface describes a mapping of cosmological constants from some
concrete Boltzmann calculator to the form those constants take in CCL. Each
supported Boltzmann calculator must have its own concrete subclass.
The class attributes are all :mod:`firecrown.connector.descriptors`. This is
to control the allowed types for the instance attributes. A descriptor of
name 'x' will provide an apparent attribute with name `x` in
each object, as well as an entry `_x` in the object's `__dict__`.
"""
# pylint: disable-msg=too-many-instance-attributes
Omega_c = TypeFloat(minvalue=0.0, maxvalue=1.0)
Omega_b = TypeFloat(minvalue=0.0, maxvalue=1.0)
h = TypeFloat(minvalue=0.3, maxvalue=1.2)
A_s = TypeFloat(allow_none=True)
sigma8 = TypeFloat(allow_none=True)
n_s = TypeFloat(allow_none=True)
Omega_k = TypeFloat(minvalue=-1.0, maxvalue=1.0)
Neff = TypeFloat(minvalue=0.0)
m_nu_type = TypeString() # "inverted", "normal" or "list"
w0 = TypeFloat()
wa = TypeFloat()
T_CMB = TypeFloat()
def __init__(self) -> None:
"""Initialize the Mapping object."""
# We can have:
# a single neutrino mass (must be non-negative)
# a list of n neutrino masses (all must be non-negative)
# None, indicating that all neutrinos are massless
self.m_nu: float | list[float] | None = None
[docs]
def get_params_names(
self, amplitude: PoweSpecAmplitudeParameter = PoweSpecAmplitudeParameter.AS
) -> list[str]:
"""Return the names of the expected cosmological parameters for this mapping."""
warnings.warn(
f"This method is implementation specific and should only be "
f"implemented on the appropriated subclasses. This method"
f"is going to be removed in the next major release. {amplitude}",
category=DeprecationWarning,
)
return []
[docs]
@final
def set_params(
self,
*,
Omega_c: float,
Omega_b: float,
h: float,
A_s: None | float = None,
sigma8: None | float = None,
n_s: float,
Omega_k: float,
Neff: float,
m_nu: float | list[float] | None,
w0: float,
wa: float,
T_CMB: float,
):
"""Sets the cosmological constants suitable a pyccl.core.CosmologyCalculator.
See the documentation of that class for an explanation of the choices and
meanings of default values of None.
:param Omega_c: fraction of cold dark matter
:param Omega_b: fraction of baryons
:param h: dimensionless Hubble parameter; h = H0/(100 km/s/Mpc)
:param A_s: amplitude of the primordial power spectrum
:param sigma8: s.d. of the matter contrast on a scale of (8 Mpc)/h
:param n_s: scalar spectral index of primordial power spectrum
:param Omega_k: curvature of the universe
:param Neff: effective number of relativistic neutrino species
:param m_nu: effective mass of neutrinos; None for massless neutrinos
:param w0: constant of the CPL parameterization of the dark energy
equation of state
:param wa: linear coefficient of the CPL parameterization of the
dark energy equation of state
:param T_CMB: cosmic microwave background temperature today
"""
# Typecheck is done automatically using the descriptors and is done to
# avoid void very confusing error messages at a later time in case of
# error.
self.Omega_c = Omega_c
self.Omega_b = Omega_b
self.h = h
if (A_s is not None) == (sigma8 is not None):
raise ValueError("Exactly one of A_s and sigma8 must be supplied")
if sigma8 is None:
self.A_s = A_s
self.sigma8 = None
else:
self.A_s = None
self.sigma8 = sigma8
self.n_s = n_s
self.Omega_k = Omega_k
self.Neff = Neff
self.m_nu = m_nu
self.w0 = w0
self.wa = wa
self.T_CMB = T_CMB
[docs]
@staticmethod
def redshift_to_scale_factor(z: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
"""Converts redshift to scale factor.
Given arrays of redshift returns an array of scale factor with the
inverse order.
:param z: array of redshifts
:return: array of scale factors
"""
scale = np.flip(1.0 / (1.0 + z))
return scale
[docs]
@staticmethod
def redshift_to_scale_factor_p_k(
p_k: npt.NDArray[np.float64],
) -> npt.NDArray[np.float64]:
"""Converts power spectrum from redshift to scale factor.
Given an 2d arrays power spectrum ordered by (redshift, mode)
return a 2d array with the rows flipped to match the reordering
from redshift to scale factor.
:param p_k: a power spectrum, ordered by (redshift, mode)
:return: array of power spectrum, ordered by (scale factor, mode)
"""
p_k_out = np.flipud(p_k)
return p_k_out
[docs]
def asdict(self) -> dict[str, float]:
"""Return a dictionary containing the cosmological constants.
:return: the dictionary, containing keys:
- ``Omega_c``: fraction of cold dark matter
- ``Omega_b``: fraction of baryons
- ``h``: dimensionless Hubble parameter; h = H0/(100 km/s/Mpc)
- ``A_s``: amplitude of the primordial power spectrum
- ``sigma8``: s.d. of the matter contrast on a scale of (8 Mpc)/h
- ``n_s``: scalar spectral index of primordial power spectrum
- ``Omega_k``: curvature of the universe
- ``Neff``: effective number of relativistic neutrino species
- ``m_nu``: effective mass of neutrinos, or first neutrino
- ``m_nu_<n>``: effective mass of the nth neutrino, n >= 2
Note: the values of n are guaranteed to be consecutive.
- ``w0``: constant of the CPL parameterization of the dark energy
equation of state
- ``wa``: linear coefficient of the CPL parameterization of the
dark energy equation of state
- ``T_CMB``: cosmic microwave background temperature today
"""
cosmo_dict: dict[str, float] = {
"Omega_c": self.Omega_c,
"Omega_b": self.Omega_b,
"h": self.h,
"n_s": self.n_s,
"Omega_k": self.Omega_k,
"Neff": self.Neff,
"w0": self.w0,
"wa": self.wa,
"T_CMB": self.T_CMB,
}
if self.A_s is not None:
cosmo_dict["A_s"] = self.A_s
if self.sigma8 is not None:
cosmo_dict["sigma8"] = self.sigma8
match self.m_nu:
case None:
cosmo_dict["m_nu"] = 0.0
case float():
cosmo_dict["m_nu"] = self.m_nu
case list():
for n, mass in enumerate(self.m_nu):
if n == 0:
cosmo_dict["m_nu"] = mass
else:
cosmo_dict[f"m_nu_{n + 1}"] = mass
case _ as unreachable:
assert_never(unreachable)
return cosmo_dict
[docs]
def get_H0(self) -> float:
"""Return the value of H0.
:return: H0 in km/s/Mpc
"""
return self.h * 100.0
[docs]
class MappingCosmoSIS(Mapping):
"""Mapping support for CosmoSIS."""
[docs]
def get_params_names(
self, amplitude: PoweSpecAmplitudeParameter = PoweSpecAmplitudeParameter.AS
) -> list[str]:
"""Return the names of the expected cosmological parameters for this mapping.
:return: a list of the cosmological parameter names
"""
match amplitude:
case PoweSpecAmplitudeParameter.AS:
amplitude_name = "A_s"
case PoweSpecAmplitudeParameter.SIGMA8:
amplitude_name = "sigma_8"
case _ as unreachable:
assert_never(unreachable)
return [
"h0",
"omega_b",
"omega_c",
amplitude_name,
"n_s",
"omega_k",
"delta_neff",
"omega_nu",
"w",
"wa",
]
[docs]
def set_params_from_cosmosis(self, cosmosis_params: NamedParameters) -> None:
"""Set the parameters in this mapping from the given CosmoSIS parameters.
:param cosmosis_params: the cosmological parameters read from CosmoSIS
"""
# TODO: Verify that CosmoSIS/CAMB uses delta_neff, not N_eff
# Both delta_neff and n_eff appear in CosmoSIS and the CSL
h = cosmosis_params.get_float("h0")
Omega_b = cosmosis_params.get_float("omega_b")
Omega_c = cosmosis_params.get_float("omega_c")
sigma8 = cosmosis_params.get_float("sigma_8", 0.8)
n_s = cosmosis_params.get_float("n_s", 0.96)
Omega_k = cosmosis_params.get_float("omega_k")
# Read omega_nu from CosmoSIS (in newer CosmoSIS)
# Read m_nu from CosmoSIS (in newer CosmoSIS)
delta_neff = cosmosis_params.get_float("delta_neff", 0.0)
Neff = delta_neff + 3.046
omega_nu = cosmosis_params.get_float("omega_nu")
m_nu = omega_nu * h * h * 93.14
w0 = cosmosis_params.get_float("w")
wa = cosmosis_params.get_float("wa")
# pylint: disable=duplicate-code
self.set_params(
Omega_c=Omega_c,
Omega_b=Omega_b,
h=h,
sigma8=sigma8,
n_s=n_s,
Omega_k=Omega_k,
Neff=Neff,
m_nu=m_nu,
w0=w0,
wa=wa,
T_CMB=2.7255,
# Modify CosmoSIS to make this available in the datablock
)
# pylint: enable=duplicate-code
[docs]
def calculate_ccl_args(self, sample: cosmosis.datablock) -> CCLCalculatorArgs:
"""Calculate the arguments necessary for CCL for this sample.
:param sample: the datablock for the current sample
:return: the arguments required by CCL
"""
pk_linear: None | PowerSpec = None
pk_nonlin: None | PowerSpec = None
if sample.has_section("matter_power_lin"):
k = self.transform_k_h_to_k(sample["matter_power_lin", "k_h"])
z_mpl = sample["matter_power_lin", "z"]
scale_mpl = self.redshift_to_scale_factor(z_mpl)
p_k = self.transform_p_k_h3_to_p_k(sample["matter_power_lin", "p_k"])
p_k = self.redshift_to_scale_factor_p_k(p_k)
pk_linear = {
"a": scale_mpl,
"k": k,
"delta_matter:delta_matter": p_k,
}
if sample.has_section("matter_power_nl"):
k = self.transform_k_h_to_k(sample["matter_power_nl", "k_h"])
z_mpl = sample["matter_power_nl", "z"]
scale_mpl = self.redshift_to_scale_factor(z_mpl)
p_k = self.transform_p_k_h3_to_p_k(sample["matter_power_nl", "p_k"])
p_k = self.redshift_to_scale_factor_p_k(p_k)
pk_nonlin = {
"a": scale_mpl,
"k": k,
"delta_matter:delta_matter": p_k,
}
chi = np.flip(sample["distances", "d_m"])
scale_distances = self.redshift_to_scale_factor(sample["distances", "z"])
# h0 = sample["cosmological_parameters", "h0"]
# NOTE: The first value of the h_over_h0 array is non-zero because of the way
# CAMB does it calculation. We do not modify this, because we want consistency.
# hubble_radius_today = (ccl.physical_constants.CLIGHT * 1e-5) / h0
# h_over_h0 = np.flip(sample["distances", "h"]) * hubble_radius_today
h_over_h0 = self.transform_h_to_h_over_h0(sample["distances", "h"])
background: Background = build_ccl_background_dict(
a=scale_distances, chi=chi, h_over_h0=h_over_h0
)
ccl_args: CCLCalculatorArgs = {"background": background}
if pk_linear is not None:
ccl_args.update({"pk_linear": pk_linear})
if pk_nonlin is not None:
ccl_args.update({"pk_nonlin": pk_nonlin})
return ccl_args
[docs]
class MappingCAMB(Mapping):
"""Mapping support for PyCAMB, the Python version of CAMB.
Note that the Python version of CAMB uses some different conventions from
the Fortran version of CAMB. The two are not interchangeable.
"""
[docs]
def get_params_names(
self, amplitude: PoweSpecAmplitudeParameter = PoweSpecAmplitudeParameter.AS
) -> list[str]:
"""Return the names of the expected cosmological parameters for this mapping.
:return: a list of the cosmological parameter names
"""
match amplitude:
case PoweSpecAmplitudeParameter.AS:
amplitude_name = "As"
case PoweSpecAmplitudeParameter.SIGMA8:
amplitude_name = "sigma8"
case _ as unreachable:
assert_never(unreachable)
return [
"H0",
"ombh2",
"omch2",
"mnu",
"nnu",
"tau",
"YHe",
amplitude_name,
"ns",
"w",
"wa",
]
[docs]
def set_params_from_camb(self, params_values: parameters.ParamsMap) -> None:
"""Set the parameters in this mapping from the given CAMB-style parameters."""
# pylint: disable-msg=R0914
# CAMB can use different parameters in place of H0, we must deal with this
# possibility here.
H0 = params_values["H0"]
ns = params_values["ns"]
ombh2 = params_values["ombh2"]
omch2 = params_values["omch2"]
Neff = params_values["nnu"]
m_nu = params_values["mnu"]
Omega_k0 = params_values["omk"]
h0 = H0 / 100.0
h02 = h0 * h0
Omega_b0 = ombh2 / h02
Omega_c0 = omch2 / h02
w = params_values.get("w", -1.0)
wa = params_values.get("wa", 0.0)
if ("As" in params_values) == ("sigma8" in params_values):
raise ValueError("Exactly one of A_s and sigma8 must be supplied.")
if "As" in params_values:
As = params_values["As"]
sigma8 = None
else:
As = None
sigma8 = params_values["sigma8"]
# Here we have the following problem, some parameters used by CAMB
# are implicit, i.e., since they are not explicitly set the default
# ones are used. Thus, for instance, here we do not know which type of
# neutrino hierarchy is used. Reading cobaya interface to CAMB
# I noticed that it does not touch the neutrino variables and consequently
# in that case, we could assume that the defaults are being used.
# Nevertheless, we need a better solution.
self.set_params(
Omega_c=Omega_c0,
Omega_b=Omega_b0,
h=h0,
n_s=ns,
Omega_k=Omega_k0,
sigma8=sigma8,
A_s=As,
m_nu=m_nu,
w0=w,
wa=wa,
Neff=Neff,
T_CMB=2.7255, # Can we make cobaya set this?
)
mapping_classes: typing.Mapping[str, type[Mapping]] = {
"CAMB": MappingCAMB,
"CosmoSIS": MappingCosmoSIS,
}
[docs]
def mapping_builder(*, input_style: str, **kwargs) -> Mapping:
"""Return the Mapping class for the given input_style.
If input_style is not recognized raise an exception.
:param input_style: the name of the mapping
:param kwargs: the parameters of the mapping
:return: the mapping object
"""
if input_style not in mapping_classes:
raise ValueError(f"input_style must be {*mapping_classes, }, not {input_style}")
return mapping_classes[input_style](**kwargs)