Source code for py21cmfast.wrapper.cfuncs

"""Low-level python wrappers of C functions."""

import logging
import warnings
from collections.abc import Sequence
from typing import Literal

import numpy as np
from numpy.typing import NDArray
from scipy.interpolate import interp1d

from .._cfg import config
from ..c_21cmfast import ffi, lib
from ..drivers._global_initialization import init_c_state
from ..drivers.global_evolution import GlobalEvolution, run_global_evolution
from ..drivers.lightcone import LightCone
from ._utils import _process_exitcode
from .inputs import InputParameters

[docs] logger = logging.getLogger(__name__)
# TODO: a lot of these assume input as numpy arrays via use of .shape, explicitly require this @init_c_state(sigma=True)
[docs] def get_expected_nhalo(*, redshift: float, inputs: InputParameters) -> int: """Get the expected number of halos in a given box. Parameters ---------- redshift : float The redshift at which to calculate the halo list. inputs: :class:`~InputParameters` The input parameters of the run Returns ------- n_halo : float The expected number of halos in the box at the given redshift under the given model. Raises ------ ValueError : If the matter options do not have a discrete halo model. """ if not inputs.matter_options.has_discrete_halos: raise ValueError( "SOURCE_MODEL must have a discrete halo model in order to calculate the expected number of halos in the box. " "Change SOURCE_MODEL to either 'DEXM-ESF' or 'CHMF-SAMPLER' in order to use this function." ) return lib.expected_nhalo( redshift, )
@init_c_state(sigma=True)
[docs] def get_halo_catalog_buffer_size( *, redshift: float, inputs: InputParameters, min_size: int = 1000000 ) -> int: """Compute the required size of the memory buffer to hold a halo list. Parameters ---------- redshift : float The redshift at which to calculate the halo list. inputs: :class:`~InputParameters` The input parameters of the run min_size : int, optional A minimum size to be used as the buffer. """ # find the buffer size from expected halos in the box hbuffer_size = get_expected_nhalo( redshift=redshift, inputs=inputs, ) hbuffer_size = int((hbuffer_size + 1) * config["HALO_CATALOG_MEM_FACTOR"]) # set a minimum in case of fluctuation at high z return int(max(hbuffer_size, min_size))
@init_c_state(broadcast_inputs=True)
[docs] def compute_mturns( *, inputs: InputParameters, redshifts: float | Sequence[float], J_LW_21: float | Sequence[float], v_cb: float | Sequence[float], ionisation_rate_G12: float | Sequence[float], z_reion: float | Sequence[float], ) -> tuple[float, float]: """ Compute the turnover masses for both ACGs and MCGs at a given redshift. Parameters ---------- redshifts : array-like The redshifts at which to compute the turnover masses. J_LW_21 : array-like The Lyman-Werner flux in units of 1e-21 erg/s/Hz/cm^2/sr at the given redshifts. v_cb : array-like The amplitude of the relative velocity between dark matter and baryons in units of km/s at the given redshifts. ionisation_rate_G12 : array-like The ionisation rate in units of 1e-12 s^-1 at the given redshifts. z_reion : array-like The reionisation redshift at the given redshifts. Returns ------- M_turn_a : array-like The turnover mass for atomic cooling halos at the given redshifts. M_turn_m : array-like The turnover mass for molecular cooling halos at the given redshifts. Raises ------ ValueError : If the input arrays do not have the same shape. """ inputs_to_check = { "J_LW_21": J_LW_21, "v_cb": v_cb, "ionisation_rate_G12": ionisation_rate_G12, "z_reion": z_reion, } redshifts_shape = np.asarray(redshifts).shape for name, value in inputs_to_check.items(): current_shape = np.asarray(value).shape if current_shape != redshifts_shape: raise ValueError( f"The shapes of redshifts and {name} are not the same! " f"Got {redshifts_shape} and {current_shape}." ) M_turn_a_ffi = ffi.new("double *") M_turn_m_ffi = ffi.new("double *") def _scalar_call(z, j, v, g, zr): lib.compute_mturns(z, j, v, g, zr, M_turn_a_ffi, M_turn_m_ffi) return M_turn_a_ffi[0], M_turn_m_ffi[0] vfunc = np.vectorize(_scalar_call, otypes=[np.float64, np.float64]) M_turn_a, M_turn_m = vfunc(redshifts, J_LW_21, v_cb, ionisation_rate_G12, z_reion) if M_turn_a.ndim == 0: # scalar input case return float(M_turn_a), float(M_turn_m) return M_turn_a, M_turn_m
@init_c_state(broadcast_inputs=True)
[docs] def compute_tau( *, redshifts: Sequence[float], global_xHI: Sequence[float], inputs: InputParameters, z_re_HeII: float = 3.0, ) -> float: """Compute the optical depth to reionization under the given model. Parameters ---------- redshifts : array-like Redshifts defining an evolution of the neutral fraction. global_xHI : array-like The mean neutral fraction at `redshifts`. inputs : :class:`~InputParameters` Defines the input parameters of the run z_re_HeII : float, optional The redshift at which helium reionization occurs. Returns ------- tau : float The optical depth to reionization Raises ------ ValueError : If `redshifts` and `global_xHI` have inconsistent length or if redshifts are not in ascending order. """ if len(redshifts) != len(global_xHI): raise ValueError("redshifts and global_xHI must have same length") if not np.all(np.diff(redshifts) > 0): raise ValueError("redshifts and global_xHI must be in ascending order") # Convert the data to the right type redshifts = np.array(redshifts, dtype="float32") global_xHI = np.array(global_xHI, dtype="float32") z = ffi.cast("float *", ffi.from_buffer(redshifts)) xHI = ffi.cast("float *", ffi.from_buffer(global_xHI)) # Run the C code return lib.ComputeTau( len(redshifts), z, xHI, z_re_HeII, )
@init_c_state(sigma=True)
[docs] def compute_luminosity_function( *, redshifts: Sequence[float], inputs: InputParameters, nbins: int = 100, mturnovers: np.ndarray | None = None, mturnovers_mini: np.ndarray | None = None, lightcone: LightCone | None = None, global_evolution: GlobalEvolution | None = None, component: Literal["both", "acg", "mcg"] = "both", ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute a the luminosity function over a given number of bins and redshifts. Parameters ---------- redshifts : array-like The redshifts at which to compute the luminosity function. inputs: :class:`~InputParameters` The input parameters defining the simulation run. nbins : int, optional The number of luminosity bins to produce for the luminosity function. lightcone : :class:`~LightCone` or None, optional The lightcone object to use for the computation. If None, the function will consider `global_evolution` for the global m_turnover values, otherwise they will be extracted from the given lightcone. global_evolution : :class:`~GlobalEvolution` or None, optional The global evolution object to use for the computation. If None, the function will run a global evolution to estimate the global m_turnover values, otherwise they will be extracted from the given global evolution. component : str, {'both', 'acg', 'mcg} The component of the LF to be calculated. Forced to be 'acg' if USE_MINI_HALOS is False. Returns ------- Muvfunc : np.ndarray Magnitude array (i.e. brightness). Shape [nredshifts, nbins] Mhfunc : np.ndarray Halo mass array. Shape [nredshifts, nbins] lfunc : np.ndarray Number density of haloes corresponding to each bin defined by `Muvfunc`. Shape [nredshifts, nbins]. """ astro_options = inputs.astro_options redshifts = np.array(redshifts, dtype="float32") if (mturnovers is not None) or (mturnovers_mini is not None): raise TypeError( "`mturnovers` and `mturnovers_mini` have been removed. " "Please use the `lightcone` or `global_evolution` arguments instead, " "or leave unspecified and they will be estimated automatically." ) if not astro_options.USE_MINI_HALOS and component != "acg": warnings.warn( "USE_MINI_HALOS is False, so only ACG LFs are computed.", stacklevel=2, ) component = "acg" if lightcone is not None: mturnovers_global = pow(10.0, lightcone.global_quantities["log10_mturn_acg"]) mturnovers_mini_global = pow( 10.0, lightcone.global_quantities["log10_mturn_mcg"] ) else: # If lightcone is not provided, we estimate the turnover masses from the global evolution if global_evolution is None: global_evolution = run_global_evolution(inputs=inputs) mturnovers_global = pow(10.0, global_evolution.quantities["log10_mturn_acg"]) mturnovers_mini_global = pow( 10.0, global_evolution.quantities["log10_mturn_mcg"] ) # Interpolate the mturnover arrays at the input redshifts mturnovers = np.interp( redshifts, inputs.node_redshifts[::-1], mturnovers_global[::-1] ) mturnovers_mini = np.interp( redshifts, inputs.node_redshifts[::-1], mturnovers_mini_global[::-1] ) lfunc = np.zeros(len(redshifts) * nbins) Muvfunc = np.zeros(len(redshifts) * nbins) Mhfunc = np.zeros(len(redshifts) * nbins) lfunc.shape = (len(redshifts), nbins) Muvfunc.shape = (len(redshifts), nbins) Mhfunc.shape = (len(redshifts), nbins) c_Muvfunc = ffi.cast("double *", ffi.from_buffer(Muvfunc)) c_Mhfunc = ffi.cast("double *", ffi.from_buffer(Mhfunc)) c_lfunc = ffi.cast("double *", ffi.from_buffer(lfunc)) lfunc_MINI = np.zeros(len(redshifts) * nbins) Muvfunc_MINI = np.zeros(len(redshifts) * nbins) Mhfunc_MINI = np.zeros(len(redshifts) * nbins) lfunc_MINI.shape = (len(redshifts), nbins) Muvfunc_MINI.shape = (len(redshifts), nbins) Mhfunc_MINI.shape = (len(redshifts), nbins) c_Muvfunc_MINI = ffi.cast("double *", ffi.from_buffer(Muvfunc_MINI)) c_Mhfunc_MINI = ffi.cast("double *", ffi.from_buffer(Mhfunc_MINI)) c_lfunc_MINI = ffi.cast("double *", ffi.from_buffer(lfunc_MINI)) if component in ("both", "acg"): # Run the C code errcode = lib.ComputeLF( nbins, 1, len(redshifts), ffi.cast("float *", ffi.from_buffer(redshifts)), ffi.cast("float *", ffi.from_buffer(mturnovers)), c_Muvfunc, c_Mhfunc, c_lfunc, ) _process_exitcode( errcode, lib.ComputeLF, ( nbins, 1, len(redshifts), ), ) if component in ("both", "mcg"): # Run the C code errcode = lib.ComputeLF( nbins, 2, len(redshifts), ffi.cast("float *", ffi.from_buffer(redshifts)), ffi.cast("float *", ffi.from_buffer(mturnovers_mini)), c_Muvfunc_MINI, c_Mhfunc_MINI, c_lfunc_MINI, ) _process_exitcode( errcode, lib.ComputeLF, ( nbins, 2, len(redshifts), ), ) if component == "both": # redo the Muv range using the faintest (most likely MINI) and the brightest (most likely massive) lfunc_all = np.zeros(len(redshifts) * nbins) Muvfunc_all = np.zeros(len(redshifts) * nbins) Mhfunc_all = np.zeros(len(redshifts) * nbins * 2) lfunc_all.shape = (len(redshifts), nbins) Muvfunc_all.shape = (len(redshifts), nbins) Mhfunc_all.shape = (len(redshifts), nbins, 2) for iz in range(len(redshifts)): Muvfunc_all[iz] = np.linspace( np.min([Muvfunc.min(), Muvfunc_MINI.min()]), np.max([Muvfunc.max(), Muvfunc_MINI.max()]), nbins, ) lfunc_all[iz] = np.log10( 10 ** ( interp1d(Muvfunc[iz], lfunc[iz], fill_value="extrapolate")( Muvfunc_all[iz] ) ) + 10 ** ( interp1d( Muvfunc_MINI[iz], lfunc_MINI[iz], fill_value="extrapolate" )(Muvfunc_all[iz]) ) ) Mhfunc_all[iz] = np.array( [ interp1d(Muvfunc[iz], Mhfunc[iz], fill_value="extrapolate")( Muvfunc_all[iz] ), interp1d( Muvfunc_MINI[iz], Mhfunc_MINI[iz], fill_value="extrapolate" )(Muvfunc_all[iz]), ], ).T lfunc_all[lfunc_all <= -30] = np.nan return Muvfunc_all, Mhfunc_all, lfunc_all elif component == "acg": lfunc[lfunc <= -30] = np.nan return Muvfunc, Mhfunc, lfunc elif component == "mcg": lfunc_MINI[lfunc_MINI <= -30] = np.nan return Muvfunc_MINI, Mhfunc_MINI, lfunc_MINI else: raise ValueError( f"Unknown component '{component}'. Must be 'both', 'acg' or 'mcg'" )
@init_c_state(ps=True)
[docs] def get_matter_power_values( *, inputs: InputParameters, k_values: Sequence[float], ): """Evaluate the matter density power spectrum (at z=0) at a certain scale from the 21cmFAST backend.""" return np.vectorize(lib.power_in_k)(k_values)
@init_c_state(ps=True)
[docs] def get_vcb_power_values( *, inputs: InputParameters, k_values: Sequence[float], ): """Evaluate the vcb power spectrum (at kinematic decoupling) at a certain scale from the 21cmFAST backend.""" if inputs.matter_options.USE_RELATIVE_VELOCITIES: return np.vectorize(lib.power_in_vcb)(k_values) else: raise ValueError( "inputs.matter_options.USE_RELATIVE_VELOCITIES must be True in order to compute the v_cb power spectrum." )
@init_c_state(sigma=True)
[docs] def evaluate_sigma( *, inputs: InputParameters, masses: NDArray[np.floating], ): """ Evaluate the variance of a mass scale. Uses the 21cmfast backend """ masses = masses.astype("f8") sigma = np.zeros_like(masses) dsigmasq = np.zeros_like(masses) lib.get_sigma( masses.size, ffi.cast("double *", ffi.from_buffer(masses)), ffi.cast("double *", ffi.from_buffer(sigma)), ffi.cast("double *", ffi.from_buffer(dsigmasq)), ) return sigma, dsigmasq
@init_c_state(broadcast_inputs=True)
[docs] def get_growth_factor( *, inputs: InputParameters, redshift: float, ): """Get the growth factor at a given redshift.""" return lib.dicke(redshift)
[docs] def get_condition_mass(inputs: InputParameters, R: float): """Determine condition masses for backend routines. Returns either mass contained within a radius, or mass of the Lagrangian cell on HII_DIM """ rhocrit = ( inputs.cosmo_params.cosmo.critical_density(0).to("M_sun Mpc-3").value * inputs.cosmo_params.OMm ) if R == "cell": volume = ( inputs.simulation_options.BOX_LEN / inputs.simulation_options.HII_DIM ) ** 3 else: volume = 4.0 / 3.0 * np.pi * R**3 return volume * rhocrit
@init_c_state(broadcast_inputs=True)
[docs] def get_delta_crit(*, inputs: InputParameters, mass: float, redshift: float): """Get the critical collapse density given a mass, redshift and parameters.""" sigma, _ = evaluate_sigma(inputs=inputs, masses=np.array([mass])) growth = get_growth_factor(inputs=inputs, redshift=redshift) return get_delta_crit_nu(inputs.matter_options.cdict["HMF"], sigma[0], growth)
[docs] def get_delta_crit_nu(hmf_int_flag: int, sigma: float, growth: float): """Get the critical density from sigma and growth factor.""" # None of the parameter structs are used in this function so we don't need a broadcast return lib.get_delta_crit(hmf_int_flag, sigma, growth)
@init_c_state(sigma=True)
[docs] def evaluate_condition_integrals( inputs: InputParameters, cond_array: NDArray[np.floating], redshift: float, redshift_prev: float | None = None, ): """Get the expected number and mass of halos given a condition. If USE_INTERPOLATION_TABLES is set to 'hmf-interpolation': Will crash if the table has not been initialised, only `cond_array` is used, and the rest of the arguments are taken from when the table was initialised. """ cond_array = cond_array.astype("f8") n_halo = np.zeros_like(cond_array) m_coll = np.zeros_like(cond_array) lib.get_condition_integrals( redshift, redshift_prev if redshift_prev is not None else -1, cond_array.size, ffi.cast("double *", ffi.from_buffer(cond_array)), ffi.cast("double *", ffi.from_buffer(n_halo)), ffi.cast("double *", ffi.from_buffer(m_coll)), ) return n_halo, m_coll
@init_c_state(sigma=True)
[docs] def integrate_chmf_interval( inputs: InputParameters, redshift: float, lnm_lower: NDArray[np.floating], lnm_upper: NDArray[np.floating], cond_values: NDArray[np.floating], redshift_prev: float | None = None, ): """Evaluate conditional mass function integrals at a range of mass intervals.""" if lnm_lower.shape != lnm_upper.shape: raise ValueError("the shapes of the two mass-limit arrays must be equal") assert np.all(lnm_lower < lnm_upper) out_prob = np.zeros((len(cond_values), len(lnm_lower)), dtype="f8") cond_values = cond_values.astype("f8") lnm_lower = lnm_lower.astype("f8") lnm_upper = lnm_upper.astype("f8") lib.get_halo_chmf_interval( redshift, redshift_prev if redshift_prev is not None else -1, len(cond_values), ffi.cast("double *", ffi.from_buffer(cond_values)), len(lnm_lower), ffi.cast("double *", ffi.from_buffer(lnm_lower)), ffi.cast("double *", ffi.from_buffer(lnm_upper)), ffi.cast("double *", ffi.from_buffer(out_prob)), ) return out_prob
@init_c_state(sigma=True)
[docs] def evaluate_inverse_table( inputs: InputParameters, cond_array: NDArray[np.floating], probabilities: NDArray[np.floating], redshift: float, redshift_prev: float | None = None, ): """Get the expected number and mass of halos given a condition.""" if cond_array.shape != probabilities.shape: raise ValueError( "the shapes of the input arrays `cond_array` and `probabilities" " must be equal." ) if redshift_prev is None: redshift_prev = -1 cond_array = cond_array.astype("f8") probabilities = probabilities.astype("f8") masses = np.zeros_like(cond_array) lib.get_halomass_at_probability( redshift, redshift_prev, cond_array.size, ffi.cast("double *", ffi.from_buffer(cond_array)), ffi.cast("double *", ffi.from_buffer(probabilities)), ffi.cast("double *", ffi.from_buffer(masses)), ) return masses
@init_c_state(sigma=True)
[docs] def evaluate_FgtrM_cond( inputs: InputParameters, densities: NDArray[np.floating], redshift: float, R: float, ): """Get the collapsed fraction from the backend, given a density and condition sigma.""" densities = densities.astype("f8") fcoll = np.zeros_like(densities) dfcoll = np.zeros_like(densities) lib.get_conditional_FgtrM( redshift, R, densities.size, ffi.cast("double *", ffi.from_buffer(densities)), ffi.cast("double *", ffi.from_buffer(fcoll)), ffi.cast("double *", ffi.from_buffer(dfcoll)), ) return fcoll, dfcoll
@init_c_state(sigma=True)
[docs] def evaluate_SFRD_z( *, inputs: InputParameters, redshifts: NDArray[np.floating], log10mturns: NDArray[np.floating] | None = None, lightcone: LightCone | None = None, global_evolution: GlobalEvolution | None = None, ): """ Evaluate the global star formation rate density (in units of M_sun/s/Mpc^3) expected at a range of redshifts. Parameters ---------- inputs: :class:`~InputParameters` The input parameters defining the simulation run. redshifts : array-like The redshifts at which to compute the SFRD. lightcone : :class:`~LightCone` or None, optional The lightcone object to use for the computation. If None, the function will consider `global_evolution` for the global m_turnover values, otherwise they will be extracted from the given lightcone. global_evolution : :class:`~GlobalEvolution` or None, optional The global evolution object to use for the computation. If None, the function will run a global evolution to estimate the global m_turnover values, otherwise they will be extracted from the given global evolution. Returns ------- sfrd : np.ndarray The global star formation rate density at the given redshifts for ACGs. sfrd_mini : np.ndarray or None The global star formation rate density at the given redshifts for MCGs. Will be None if `USE_MINI_HALOS` is False. """ if log10mturns is not None: raise TypeError( "`log10mturns` has been removed. " "Please use the `lightcone` or `global_evolution` arguments instead, " "or leave unspecified and they will be estimated automatically." ) if inputs.astro_options.USE_MINI_HALOS: if lightcone is not None: log10mturns_mini_global = lightcone.global_quantities["log10_mturn_mcg"] else: # If lightcone is not provided, we estimate the turnover masses from the global evolution if global_evolution is None: global_evolution = run_global_evolution(inputs=inputs) log10mturns_mini_global = global_evolution.quantities["log10_mturn_mcg"] log10mturns_mini = np.interp( redshifts, inputs.node_redshifts[::-1], log10mturns_mini_global[::-1] ) else: log10mturns_mini = np.zeros_like(redshifts) # dummy value for no mini halos redshifts = np.asarray(redshifts).astype("f8") log10mturns_mini = log10mturns_mini.astype("f8") sfrd = np.zeros_like(redshifts) sfrd_mini = np.zeros_like(redshifts) lib.get_global_SFRD_z( redshifts.size, ffi.cast("double *", ffi.from_buffer(redshifts)), ffi.cast("double *", ffi.from_buffer(log10mturns_mini)), ffi.cast("double *", ffi.from_buffer(sfrd)), ffi.cast("double *", ffi.from_buffer(sfrd_mini)), ) if not inputs.astro_options.USE_MINI_HALOS: sfrd_mini = None return sfrd, sfrd_mini
@init_c_state(sigma=True)
[docs] def evaluate_Nion_z( *, inputs: InputParameters, redshifts: NDArray[np.floating], log10mturns: NDArray[np.floating] | None = None, lightcone: LightCone | None = None, global_evolution: GlobalEvolution | None = None, ): """ Evaluate the global number of ionising photons per baryon, expected at a range of redshifts. Parameters ---------- inputs: :class:`~InputParameters` The input parameters defining the simulation run. redshifts : array-like The redshifts at which to compute Nion. lightcone : :class:`~LightCone` or None, optional The lightcone object to use for the computation. If None, the function will consider `global_evolution` for the global m_turnover values, otherwise they will be extracted from the given lightcone. global_evolution : :class:`~GlobalEvolution` or None, optional The global evolution object to use for the computation. If None, the function will run a global evolution to estimate the global m_turnover values, otherwise they will be extracted from the given global evolution. Returns ------- nion : np.ndarray The global number of ionising photons per baryon at the given redshifts for ACGs. nion_mini : np.ndarray or None The global number of ionising photons per baryon at the given redshifts for MCGs. Will be None if `USE_MINI_HALOS` is False. """ if log10mturns is not None: raise TypeError( "`log10mturns` has been removed. " "Please use the `lightcone` or `global_evolution` arguments instead, " "or leave unspecified and they will be estimated automatically." ) if inputs.astro_options.USE_MINI_HALOS: if lightcone is not None: log10mturns_mini_global = lightcone.global_quantities["log10_mturn_mcg"] else: # If lightcone is not provided, we estimate the turnover masses from the global evolution if global_evolution is None: global_evolution = run_global_evolution(inputs=inputs) log10mturns_mini_global = global_evolution.quantities["log10_mturn_mcg"] log10mturns_mini = np.interp( redshifts, inputs.node_redshifts[::-1], log10mturns_mini_global[::-1] ) else: log10mturns_mini = np.zeros_like(redshifts) # dummy value for no mini halos redshifts = np.asarray(redshifts).astype("f8") log10mturns_mini = log10mturns_mini.astype("f8") nion = np.zeros_like(redshifts) nion_mini = np.zeros_like(redshifts) lib.get_global_Nion_z( redshifts.size, ffi.cast("double *", ffi.from_buffer(redshifts)), ffi.cast("double *", ffi.from_buffer(log10mturns_mini)), ffi.cast("double *", ffi.from_buffer(nion)), ffi.cast("double *", ffi.from_buffer(nion_mini)), ) if not inputs.astro_options.USE_MINI_HALOS: nion_mini = None return nion, nion_mini
@init_c_state(sigma=True)
[docs] def evaluate_SFRD_cond( *, inputs: InputParameters, redshift: float, radius: float, densities: NDArray[np.floating], log10mturns: NDArray[np.floating] | None = None, lightcone: LightCone | None = None, global_evolution: GlobalEvolution | None = None, ): """ Evaluate the conditional star formation rate density (in units of M_sun/s/Mpc^3) expected at a range of densities. Parameters ---------- inputs: :class:`~InputParameters` The input parameters defining the simulation run. redshift : float The redshift at which to compute the SFRD. radius : float The radius of the region at which to compute the conditional SFRD. densities : array-like The densities at which to compute the conditional SFRD. lightcone : :class:`~LightCone` or None, optional The lightcone object to use for the computation. If None, the function will consider `global_evolution` for the global m_turnover values, otherwise they will be extracted from the given lightcone. global_evolution : :class:`~GlobalEvolution` or None, optional The global evolution object to use for the computation. If None, the function will run a global evolution to estimate the global m_turnover values, otherwise they will be extracted from the given global evolution. Returns ------- sfrd : np.ndarray The conditional star formation rate density at the given redshift and radius for ACGs. sfrd_mini : np.ndarray or None The conditional star formation rate density at the given redshift and radius for MCGs. Will be None if `USE_MINI_HALOS` is False. Notes ----- This function estimates the conditional SFRD by using the global turnover masses. In reality, these turnover masses do not depend solely on redshift, but also on the local density field, as well on its environment and history. Since it is impossible to well-define the conditional SFRD in given region by only providing redshift and density, we approximate the used turnover masses in this calculation to be the global ones. """ if log10mturns is not None: raise TypeError( "`log10mturns` has been removed. " "Please use the `lightcone` or `global_evolution` arguments instead, " "or leave unspecified and they will be estimated automatically." ) if inputs.astro_options.USE_MINI_HALOS: if lightcone is not None: log10mturns_mini_global = lightcone.global_quantities["log10_mturn_mcg"] else: # If lightcone is not provided, we estimate the turnover masses from the global evolution if global_evolution is None: global_evolution = run_global_evolution(inputs=inputs) log10mturns_mini_global = global_evolution.quantities["log10_mturn_mcg"] log10mturn_mini = np.interp( redshift, inputs.node_redshifts[::-1], log10mturns_mini_global[::-1] ) else: log10mturn_mini = 0.0 # dummy value for no mini halos densities = densities.astype("f8") sfrd = np.zeros_like(densities) sfrd_mini = np.zeros_like(densities) lib.get_conditional_SFRD( redshift, radius, densities.size, ffi.cast("double *", ffi.from_buffer(densities)), log10mturn_mini, ffi.cast("double *", ffi.from_buffer(sfrd)), ffi.cast("double *", ffi.from_buffer(sfrd_mini)), ) if not inputs.astro_options.USE_MINI_HALOS: sfrd_mini = None return sfrd, sfrd_mini
@init_c_state(sigma=True)
[docs] def evaluate_Nion_cond( *, inputs: InputParameters, redshift: float, radius: float, densities: NDArray[np.floating], l10mturns_acg: NDArray[np.floating] | None = None, l10mturns_mcg: NDArray[np.floating] | None = None, lightcone: LightCone | None = None, global_evolution: GlobalEvolution | None = None, ): """ Evaluate the global number of ionising photons per baryon, expected at a range of densities. Parameters ---------- inputs: :class:`~InputParameters` The input parameters defining the simulation run. redshift : float The redshift at which to compute Nion. radius : float The radius of the region at which to compute the conditional Nion. densities : array-like The densities at which to compute the conditional Nion. lightcone : :class:`~LightCone` or None, optional The lightcone object to use for the computation. If None, the function will consider `global_evolution` for the global m_turnover values, otherwise they will be extracted from the given lightcone. global_evolution : :class:`~GlobalEvolution` or None, optional The global evolution object to use for the computation. If None, the function will run a global evolution to estimate the global m_turnover values, otherwise they will be extracted from the given global evolution. Returns ------- nion : np.ndarray The conditional number of ionising photons per baryon at the given redshift and radius for ACGs. nion_mini : np.ndarray or None The conditional number of ionising photons per baryon at the given redshift and radius for MCGs. Will be None if `USE_MINI_HALOS` is False. Notes ----- This function estimates the conditional N_ion by using the global turnover masses. In reality, these turnover masses do not depend solely on redshift, but also on the local density field, as well on its environment and history. Since it is impossible to well-define the conditional N_ion in given region by only providing redshift and density, we approximate the used turnover masses in this calculation to be the global ones. """ if (l10mturns_acg is not None) or (l10mturns_mcg is not None): raise TypeError( "`l10mturns_acg` and `l10mturns_mcg` have been removed. " "Please use the `lightcone` or `global_evolution` arguments instead, " "or leave unspecified and they will be estimated automatically." ) # TODO: Why this function is the only one that needs the global mturnover values for ACGs? if lightcone is not None: log10mturns_global = lightcone.global_quantities["log10_mturn_acg"] log10mturns_mini_global = lightcone.global_quantities["log10_mturn_mcg"] else: # If lightcone is not provided, we estimate the turnover masses from the global evolution if global_evolution is None: global_evolution = run_global_evolution(inputs=inputs) log10mturns_global = global_evolution.quantities["log10_mturn_acg"] log10mturns_mini_global = global_evolution.quantities["log10_mturn_mcg"] log10mturn_acg = np.interp( redshift, inputs.node_redshifts[::-1], log10mturns_global[::-1] ) log10mturn_mcg = np.interp( redshift, inputs.node_redshifts[::-1], log10mturns_mini_global[::-1] ) densities = densities.astype("f8") nion = np.zeros_like(densities) nion_mini = np.zeros_like(densities) lib.get_conditional_Nion( redshift, radius, densities.size, ffi.cast("double *", ffi.from_buffer(densities)), log10mturn_acg, log10mturn_mcg, ffi.cast("double *", ffi.from_buffer(nion)), ffi.cast("double *", ffi.from_buffer(nion_mini)), ) if not inputs.astro_options.USE_MINI_HALOS: nion_mini = None return nion, nion_mini
@init_c_state(sigma=True)
[docs] def evaluate_Xray_cond( *, inputs: InputParameters, redshift: float, radius: float, densities: NDArray[np.floating], log10mturns: NDArray[np.floating] | None = None, lightcone: LightCone | None = None, global_evolution: GlobalEvolution | None = None, ): """ Evaluate the conditional X-ray emissivity (in units of erg/s/Mpc^3) expected at a range of densities. Parameters ---------- inputs: :class:`~InputParameters` The input parameters defining the simulation run. redshift : float The redshift at which to compute the conditional X-ray emissivity. radius : float The radius of the region at which to compute the conditional X-ray emissivity. densities : array-like The densities at which to compute the conditional X-ray emissivity. lightcone : :class:`~LightCone` or None, optional The lightcone object to use for the computation. If None, the function will consider `global_evolution` for the global m_turnover values, otherwise they will be extracted from the given lightcone. global_evolution : :class:`~GlobalEvolution` or None, optional The global evolution object to use for the computation. If None, the function will run a global evolution to estimate the global m_turnover values, otherwise they will be extracted from the given global evolution. Returns ------- xray_emissivity : np.ndarray The conditional X-ray emissivity at the given redshift and radius for ACGs and MCGs combined. Notes ----- This function estimates the conditional X-ray emissivity by using the global turnover masses. In reality, these turnover masses do not depend solely on redshift, but also on the local density field, as well on its environment and history. Since it is impossible to well-define the conditional X-ray emissivity in given region by only providing redshift and density, we approximate the used turnover masses in this calculation to be the global ones. """ if log10mturns is not None: raise TypeError( "`log10mturns` has been removed. " "Please use the `lightcone` or `global_evolution` arguments instead, " "or leave unspecified and they will be estimated automatically." ) if inputs.astro_options.USE_MINI_HALOS: if lightcone is not None: log10mturns_mini_global = lightcone.global_quantities["log10_mturn_mcg"] else: # If lightcone is not provided, we estimate the turnover masses from the global evolution if global_evolution is None: global_evolution = run_global_evolution(inputs=inputs) log10mturns_mini_global = global_evolution.quantities["log10_mturn_mcg"] log10mturn_mini = np.interp( redshift, inputs.node_redshifts[::-1], log10mturns_mini_global[::-1] ) else: log10mturn_mini = 0.0 # dummy value for no mini halos densities = densities.astype("f8") xray_emissivity = np.zeros_like(densities) lib.get_conditional_Xray( redshift, radius, densities.size, ffi.cast("double *", ffi.from_buffer(densities)), log10mturn_mini, ffi.cast("double *", ffi.from_buffer(xray_emissivity)), ) return xray_emissivity
@init_c_state(sigma=True)
[docs] def sample_halos_from_conditions( *, inputs: InputParameters, redshift: float, cond_array, redshift_prev: float | None = None, buffer_size: int | None = None, ): """Construct a halo sample given a descendant catalogue and redshifts.""" z_prev = -1 if redshift_prev is None else redshift_prev if buffer_size is None: buffer_size = get_halo_catalog_buffer_size(inputs=inputs, redshift=redshift) n_cond = cond_array.size # all coordinates zero crd_in = np.zeros(3 * n_cond).astype("f4") cond_array = cond_array.astype("f4") nhalo_out = np.zeros(1).astype("i4") N_out = np.zeros(n_cond).astype("i4") M_out = np.zeros(n_cond).astype("f8") exp_M = np.zeros(n_cond).astype("f8") exp_N = np.zeros(n_cond).astype("f8") halomass_out = np.zeros(buffer_size).astype("f4") halocrd_out = np.zeros(int(3 * buffer_size)).astype("i4") lib.single_test_sample( inputs.random_seed, n_cond, ffi.cast("float *", cond_array.ctypes.data), ffi.cast("float *", crd_in.ctypes.data), redshift, z_prev, ffi.cast("int *", nhalo_out.ctypes.data), ffi.cast("int *", N_out.ctypes.data), ffi.cast("double *", exp_N.ctypes.data), ffi.cast("double *", M_out.ctypes.data), ffi.cast("double *", exp_M.ctypes.data), ffi.cast("float *", halomass_out.ctypes.data), ffi.cast("float *", halocrd_out.ctypes.data), ) return { "n_halo_total": nhalo_out[0], "halo_masses": halomass_out, "n_progenitors": N_out, "progenitor_mass": M_out, "expected_progenitors": exp_N, "expected_progenitor_mass": exp_M, }
@init_c_state(broadcast_inputs=True)
[docs] def convert_halo_properties( *, redshift: float, inputs: InputParameters, halo_masses: NDArray[np.floating], star_rng: NDArray[np.floating], sfr_rng: NDArray[np.floating], xray_rng: NDArray[np.floating], halo_coords: NDArray[np.floating] | None = None, vcb_grid: NDArray[np.floating] | None = None, J_21_LW_grid: NDArray[np.floating] | None = None, z_re_grid: NDArray[np.floating] | None = None, Gamma12_grid: NDArray[np.floating] | None = None, ): """ Convert a halo catalogue's mass and RNG fields to halo properties. Assumes no feedback (Lyman-Werner, reionization). Returns a dict of 12 properties per halo: halo mass stellar mass (ACG) star formation rate (ACG) xray luminosity (combined) ionising emissivity (combined) escape-fraction weighted SFR (combined) stellar mass (MCG) star formation rate (MCG) ACG turnover mass MCG turnover mass Reionization turnover mass Metallicity """ # single element zero array to act as the grids (vcb, J_21_LW, z_reion, Gamma12) if not (halo_masses.shape == star_rng.shape == sfr_rng.shape == xray_rng.shape): raise ValueError("Halo masses and rng shapes must be identical.") n_halos = halo_masses.size out_buffer = np.zeros((n_halos, 12), dtype="f4") lo_dim = (inputs.simulation_options.HII_DIM,) * 3 if halo_coords is None: halo_coords = np.zeros(3 * n_halos) if vcb_grid is None: vcb_grid = np.zeros(lo_dim) if J_21_LW_grid is None: J_21_LW_grid = np.zeros(lo_dim) if z_re_grid is None: z_re_grid = np.zeros(lo_dim) if Gamma12_grid is None: Gamma12_grid = np.zeros(lo_dim) vcb_grid = vcb_grid.astype("f4") J_21_LW_grid = J_21_LW_grid.astype("f4") z_re_grid = z_re_grid.astype("f4") Gamma12_grid = Gamma12_grid.astype("f4") halo_masses = halo_masses.astype("f4") halo_coords = halo_coords.astype("f4") star_rng = star_rng.astype("f4") sfr_rng = sfr_rng.astype("f4") xray_rng = xray_rng.astype("f4") lib.test_halo_props( redshift, ffi.cast("float *", vcb_grid.ctypes.data), ffi.cast("float *", J_21_LW_grid.ctypes.data), ffi.cast("float *", z_re_grid.ctypes.data), ffi.cast("float *", Gamma12_grid.ctypes.data), n_halos, ffi.cast("float *", halo_masses.ctypes.data), ffi.cast("float *", halo_coords.ctypes.data), ffi.cast("float *", star_rng.ctypes.data), ffi.cast("float *", sfr_rng.ctypes.data), ffi.cast("float *", xray_rng.ctypes.data), ffi.cast("float *", out_buffer.ctypes.data), ) out_buffer = out_buffer.reshape(n_halos, 12) return { "halo_mass": out_buffer[:, 0].reshape(halo_masses.shape), "halo_stars": out_buffer[:, 1].reshape(halo_masses.shape), "halo_sfr": out_buffer[:, 2].reshape(halo_masses.shape), "halo_xray": out_buffer[:, 3].reshape(halo_masses.shape), "n_ion": out_buffer[:, 4].reshape(halo_masses.shape), "halo_wsfr": out_buffer[:, 5].reshape(halo_masses.shape), "halo_stars_mini": out_buffer[:, 6].reshape(halo_masses.shape), "halo_sfr_mini": out_buffer[:, 7].reshape(halo_masses.shape), "mturn_a": out_buffer[:, 8].reshape(halo_masses.shape), "mturn_m": out_buffer[:, 9].reshape(halo_masses.shape), "mturn_r": out_buffer[:, 10].reshape(halo_masses.shape), "metallicity": out_buffer[:, 11].reshape(halo_masses.shape), }
@init_c_state(sigma=True)
[docs] def return_uhmf_value( *, inputs: InputParameters, redshift: float, mass_values: Sequence[float], ): """Return the value of the unconditional halo mass function at given parameters. Parameters ---------- inputs : InputParameters The input parameters defining the simulation run. redshift : float The redshift at which to evaluate the halo mass function. mass_values : float The mass values at which to evaluate the halo mass function. """ growthf = lib.dicke(redshift) return np.vectorize(lib.unconditional_hmf)( growthf, np.log(mass_values), redshift, inputs.matter_options.cdict["HMF"] )
@init_c_state(sigma=True)
[docs] def return_chmf_value( *, inputs: InputParameters, redshift: float, mass_values: Sequence[float], delta_values: Sequence[float], condmass_values: Sequence[float], ): """Return the value of the conditional halo mass function at given parameters. Parameters ---------- inputs : InputParameters The input parameters defining the simulation run. redshift : float The redshift at which to evaluate the halo mass function. mass_values : float The mass values at which to evaluate the halo mass function. delta : float The overdensity at which to evaluate the halo mass function. cond_mass : float The condition mass at which to evaluate the halo mass function. """ growthf = lib.dicke(redshift) sigma = np.vectorize(lib.sigma_z0)(condmass_values) return np.vectorize(lib.conditional_hmf)( growthf, np.log(mass_values[None, None, :]), delta_values[:, None, None], sigma[None, :, None], inputs.matter_options.cdict["HMF"], )