# Simulation libraries
import skysurvey
# import sncosmo
# from astropy.table import Table
import numpy as np
from pyDOE import lhs # LHS sampler
import skysurvey_sniapop
from scipy.special import erfinv, erf, expit
from astropy.cosmology import Planck18, FlatLambdaCDM
# import pandas as pd
# import os
# ztf_logs_path = 'data/ztf_logs.parquet'
# if os.path.exists(ztf_logs_path):
# log_data = pd.read_parquet(ztf_logs_path)
# else:
# survey = skysurvey.survey.ZTF.from_logs()
fb = Planck18.Ob0 / Planck18.Om0
[docs]
def scan_params(priors, N, n_realisation=1, dtype=np.float32):
"""
Generate sampled parameter sets using Latin Hypercube Sampling (LHS),
using the per-parameter priors defined in `priors`.
Parameters
----------
priors : dict
Mapping param name -> {'range': (low, high), 'type': str}.
Supported types: 'uniform', 'gaussian', 'half-gaussian', 'log-uniform'.
N : int
Number of distinct parameter tuples.
n_realisation : int, optional
Number of realizations per parameter tuple.
dtype : data-type, optional
Numeric type for the sampled arrays (default is np.float32).
Returns
-------
params_dict : dict
Dictionary of parameter arrays of shape (N * n_realisation,).
"""
param_names = list(priors.keys())
n_params = len(param_names)
# LHS unit samples in [0,1]
unit_samples = lhs(n_params, samples=N) # shape (N, n_params)
samples = np.zeros_like(unit_samples)
for i, p in enumerate(param_names):
u = unit_samples[:, i]
info = priors[p]
low, high = info["range"]
ptype = info["type"]
if ptype == 'uniform':
samples[:, i] = u * (high - low) + low
elif ptype == 'gaussian':
mu = 0.5 * (low + high)
sigma = (high - low) / (2.0 * 1.96)
gaussian = np.sqrt(2.0) * erfinv(2.0 * u - 1.0)
samples[:, i] = mu + sigma * gaussian
elif ptype == 'half-gaussian':
if low != 0:
raise ValueError(f"Half-Gaussian prior requires low=0, got {low}")
sigma = high / 1.96
gaussian = np.sqrt(2.0) * erfinv(2.0 * u - 1.0)
samples[:, i] = np.abs(gaussian) * sigma
elif ptype == 'positive-gaussian':
mu = 0.5 * (low + high)
sigma = (high - low) / (2.0 * 1.96)
alpha = (0.0 - mu) / sigma
Phi_alpha = 0.5 * (1.0 + erf(alpha / np.sqrt(2.0)))
# Transform uniform samples into truncated normal
truncated_u = Phi_alpha + u * (1.0 - Phi_alpha)
gaussian = np.sqrt(2.0) * erfinv(2.0 * truncated_u - 1.0)
samples[:, i] = mu + sigma * gaussian
elif ptype == 'exponential':
if low != 0:
raise ValueError(f"Exponential prior requires low=0, got {low}")
# 95% mass below `high`
lam = -np.log(1.0 - 0.95) / high
# Standard exponential inverse CDF
samples[:, i] = -np.log(1.0 - u) / lam
elif ptype == 'truncated-exponential':
if low != 0:
raise ValueError(f"Exponential prior requires low=0, got {low}")
# Truncate the exponential distribution at `high`
lam = -np.log(1.0 - 0.95) / high # Lambda for 95% mass below `high`
# Inverse CDF for truncated exponential: F^{-1}(u) = -log(1 - u*(1 - exp(-lam*high))) / lam
# This ensures samples are in [0, high]
samples[:, i] = -np.log(1.0 - u * (1.0 - np.exp(-lam * high))) / lam
elif ptype == 'log-uniform':
if low <= 0:
raise ValueError(f"log-uniform prior for '{p}' requires low>0")
samples[:, i] = low * (high / low) ** u
else:
raise ValueError(f"Unknown prior type '{ptype}' for parameter '{p}'")
# Repeat for multiple realizations if needed
params_dict = {p: np.repeat(samples[:, i], n_realisation).astype(dtype)
for i, p in enumerate(param_names)}
return params_dict
[docs]
def simulate_one(params_dict, z_max, M, cols, errormodel=None, N=None, i=None, survey_name=None, lightcurve=False):
"""
Simulate a single dataset of SNe Ia.
Parameters
----------
params_dict : dict
Dictionary of model parameters (alpha, beta, mabs, gamma, sigma_int, etc.).
z_max : float
Maximum redshift.
M : int
Number of SNe to simulate.
cols : list of str
List of columns to include in the output.
errormodel : dict, optional
Error model to apply to the simulated data.
N : int, optional
Total number of simulations (for progress printing).
i : int, optional
Current simulation index (for progress printing).
survey : str, optional
Name of the survey (e.g., 'ztf', 'snls') to apply the corresponding selection function.
Returns
-------
data_dict : dict
Dictionary of lists (one per column) containing the simulated data.
"""
# Print progress
if N is not None and i is not None:
if (i+1) % max(1, N//10) == 0 or i == N-1:
print(f"Simulation {i+1}/{N}", end="\r", flush=True)
# Define default parameters including sigma_int
default_params = {
"alpha_low": 0.0,
"alpha_high": 0.0,
"beta": 0.0,
"mabs": -19.3,
"gamma": 0.0,
"sigma_int": 0.0, # default intrinsic scatter
"x1_ref": -0.5,
}
# Merge defaults with provided params (params_dict takes priority)
params = {**default_params, **params_dict}
# If a single alpha is provided, enforce alpha_low = alpha_high = alpha
if "alpha" in params:
params["alpha_low"] = params["alpha"]
params["alpha_high"] = params["alpha"]
# Ensure all are floats
alpha_low_ = float(params["alpha_low"])
alpha_high_ = float(params["alpha_high"])
beta_ = float(params["beta"])
mabs_ = float(params["mabs"])
gamma_ = float(params["gamma"])
sigma_int_ = float(params["sigma_int"])
x1_ref_ = float(params["x1_ref"])
# Set survey-specific selection parameters
cut_loc, cut_scale = None, None
if survey_name is not None:
cut_loc_key = f"cut_loc_{survey_name}"
cut_scale_key = f"cut_scale_{survey_name}"
cut_loc = params[cut_loc_key]
cut_scale = params[cut_scale_key]
if "Om0" in params:
Om0 = params["Om0"]
Ob0 = fb * Om0
cosmo = FlatLambdaCDM(
**(Planck18.parameters | {"Om0": Om0, "Ob0": Ob0})
)
else:
cosmo = FlatLambdaCDM(**(Planck18.parameters))
brokenalpha_model = skysurvey_sniapop.brokenalpha_model
# Generate SNe sample
snia = skysurvey.SNeIa.from_draw(
# tstart=survey.date_range[0],
# tstop=survey.date_range[1],
size=M,
zmax=z_max,
model=brokenalpha_model,
magabs={
"x1": "@x1",
"c": "@c",
"mabs": mabs_,
"sigmaint": sigma_int_,
"alpha_low": alpha_low_,
"alpha_high": alpha_high_,
"beta": beta_,
"gamma": gamma_,
"x1ref": x1_ref_
},
magobs={
'cosmology': cosmo
}
)
# if lightcurve==True:
# dset = skysurvey.dataset.DataSet.from_targets_and_survey(snia, survey, phase_range=[-20, 60])
# ndetection = dset.get_ndetection()
# detected_indexes = ndetection[ndetection >= 7].index
# detected_sne_data = dset.data.loc[detected_indexes]
# detected_sne_data['zpsys'] = 'ab'
# grouped = detected_sne_data.groupby(level=0)
# results = {}
# fitted_models = {}
# # Loop over each supernova
# for sn_id, sn_data in grouped:
# print(f'sn_id : {sn_id}', end='\r')
# # Rename columns to match sncosmo's expectations
# sncosmo_data = sn_data.rename(columns={
# 'mjd': 'time',
# 'flux': 'flux',
# 'fluxerr': 'fluxerr',
# 'zp': 'zp',
# 'band': 'band'
# })
# # sncosmo_data['zpsys'] = 'ab'
# # Convert to astropy.table.Table
# sncosmo_table = Table.from_pandas(sncosmo_data)
# row = data.iloc[sn_id]
# model = sncosmo.Model(source='salt2')
# # Set the model redshift to the known value
# model.set(z=row['z']) # fix redshift from your row datafra
# # Fit the light curve
# try:
# result, fitted_model = sncosmo.fit_lc(
# sncosmo_table,
# model,
# vparam_names=['t0', 'x0', 'x1', 'c'],
# guess_z=False,
# minsnr=5.0
# )
# results[sn_id] = result
# fitted_models[sn_id] = fitted_model
# except Exception as e:
# print(f"Failed to fit supernova {sn_id}: {e}")
# results[sn_id] = None
# Apply noise
if errormodel is None:
df = snia.data
else:
noisy_snia = snia.apply_gaussian_noise(errormodel)
df = noisy_snia.data
# Apply malmquist bias (selection) if survey-specific parameters are provided
if cut_loc is not None and cut_scale is not None:
mag = np.asarray(df["magobs"], dtype=np.float32)
# Detection probability
p_detect = 1.0 - expit((mag - cut_loc) * cut_scale)
# Bernoulli draw handled by numpy
mask = np.random.binomial(1, p_detect).astype(bool)
df = df.loc[mask].reset_index(drop=True)
# Collect columns
data_dict = {col: list(df[col]) for col in cols if col in df}
return data_dict