Source code for ximinf.generate_sim

# Simulation libraries
import skysurvey
# import sncosmo
# from astropy.table import Table
import numpy as np
from pyDOE import lhs  # LHS sampler
import skysurvey_sniapop
from scipy.special import erfinv, erf, expit
from astropy.cosmology import Planck18, FlatLambdaCDM
# import pandas as pd
# import os

# ztf_logs_path = 'data/ztf_logs.parquet'
# if os.path.exists(ztf_logs_path):
#     log_data = pd.read_parquet(ztf_logs_path)
# else:
#     survey = skysurvey.survey.ZTF.from_logs()



fb = Planck18.Ob0 / Planck18.Om0

[docs] def scan_params(priors, N, n_realisation=1, dtype=np.float32): """ Generate sampled parameter sets using Latin Hypercube Sampling (LHS), using the per-parameter priors defined in `priors`. Parameters ---------- priors : dict Mapping param name -> {'range': (low, high), 'type': str}. Supported types: 'uniform', 'gaussian', 'half-gaussian', 'log-uniform'. N : int Number of distinct parameter tuples. n_realisation : int, optional Number of realizations per parameter tuple. dtype : data-type, optional Numeric type for the sampled arrays (default is np.float32). Returns ------- params_dict : dict Dictionary of parameter arrays of shape (N * n_realisation,). """ param_names = list(priors.keys()) n_params = len(param_names) # LHS unit samples in [0,1] unit_samples = lhs(n_params, samples=N) # shape (N, n_params) samples = np.zeros_like(unit_samples) for i, p in enumerate(param_names): u = unit_samples[:, i] info = priors[p] low, high = info["range"] ptype = info["type"] if ptype == 'uniform': samples[:, i] = u * (high - low) + low elif ptype == 'gaussian': mu = 0.5 * (low + high) sigma = (high - low) / (2.0 * 1.96) gaussian = np.sqrt(2.0) * erfinv(2.0 * u - 1.0) samples[:, i] = mu + sigma * gaussian elif ptype == 'half-gaussian': if low != 0: raise ValueError(f"Half-Gaussian prior requires low=0, got {low}") sigma = high / 1.96 gaussian = np.sqrt(2.0) * erfinv(2.0 * u - 1.0) samples[:, i] = np.abs(gaussian) * sigma elif ptype == 'positive-gaussian': mu = 0.5 * (low + high) sigma = (high - low) / (2.0 * 1.96) alpha = (0.0 - mu) / sigma Phi_alpha = 0.5 * (1.0 + erf(alpha / np.sqrt(2.0))) # Transform uniform samples into truncated normal truncated_u = Phi_alpha + u * (1.0 - Phi_alpha) gaussian = np.sqrt(2.0) * erfinv(2.0 * truncated_u - 1.0) samples[:, i] = mu + sigma * gaussian elif ptype == 'exponential': if low != 0: raise ValueError(f"Exponential prior requires low=0, got {low}") # 95% mass below `high` lam = -np.log(1.0 - 0.95) / high # Standard exponential inverse CDF samples[:, i] = -np.log(1.0 - u) / lam elif ptype == 'truncated-exponential': if low != 0: raise ValueError(f"Exponential prior requires low=0, got {low}") # Truncate the exponential distribution at `high` lam = -np.log(1.0 - 0.95) / high # Lambda for 95% mass below `high` # Inverse CDF for truncated exponential: F^{-1}(u) = -log(1 - u*(1 - exp(-lam*high))) / lam # This ensures samples are in [0, high] samples[:, i] = -np.log(1.0 - u * (1.0 - np.exp(-lam * high))) / lam elif ptype == 'log-uniform': if low <= 0: raise ValueError(f"log-uniform prior for '{p}' requires low>0") samples[:, i] = low * (high / low) ** u else: raise ValueError(f"Unknown prior type '{ptype}' for parameter '{p}'") # Repeat for multiple realizations if needed params_dict = {p: np.repeat(samples[:, i], n_realisation).astype(dtype) for i, p in enumerate(param_names)} return params_dict
[docs] def simulate_one(params_dict, z_max, M, cols, errormodel=None, N=None, i=None, survey_name=None, lightcurve=False): """ Simulate a single dataset of SNe Ia. Parameters ---------- params_dict : dict Dictionary of model parameters (alpha, beta, mabs, gamma, sigma_int, etc.). z_max : float Maximum redshift. M : int Number of SNe to simulate. cols : list of str List of columns to include in the output. errormodel : dict, optional Error model to apply to the simulated data. N : int, optional Total number of simulations (for progress printing). i : int, optional Current simulation index (for progress printing). survey : str, optional Name of the survey (e.g., 'ztf', 'snls') to apply the corresponding selection function. Returns ------- data_dict : dict Dictionary of lists (one per column) containing the simulated data. """ # Print progress if N is not None and i is not None: if (i+1) % max(1, N//10) == 0 or i == N-1: print(f"Simulation {i+1}/{N}", end="\r", flush=True) # Define default parameters including sigma_int default_params = { "alpha_low": 0.0, "alpha_high": 0.0, "beta": 0.0, "mabs": -19.3, "gamma": 0.0, "sigma_int": 0.0, # default intrinsic scatter "x1_ref": -0.5, } # Merge defaults with provided params (params_dict takes priority) params = {**default_params, **params_dict} # If a single alpha is provided, enforce alpha_low = alpha_high = alpha if "alpha" in params: params["alpha_low"] = params["alpha"] params["alpha_high"] = params["alpha"] # Ensure all are floats alpha_low_ = float(params["alpha_low"]) alpha_high_ = float(params["alpha_high"]) beta_ = float(params["beta"]) mabs_ = float(params["mabs"]) gamma_ = float(params["gamma"]) sigma_int_ = float(params["sigma_int"]) x1_ref_ = float(params["x1_ref"]) # Set survey-specific selection parameters cut_loc, cut_scale = None, None if survey_name is not None: cut_loc_key = f"cut_loc_{survey_name}" cut_scale_key = f"cut_scale_{survey_name}" cut_loc = params[cut_loc_key] cut_scale = params[cut_scale_key] if "Om0" in params: Om0 = params["Om0"] Ob0 = fb * Om0 cosmo = FlatLambdaCDM( **(Planck18.parameters | {"Om0": Om0, "Ob0": Ob0}) ) else: cosmo = FlatLambdaCDM(**(Planck18.parameters)) brokenalpha_model = skysurvey_sniapop.brokenalpha_model # Generate SNe sample snia = skysurvey.SNeIa.from_draw( # tstart=survey.date_range[0], # tstop=survey.date_range[1], size=M, zmax=z_max, model=brokenalpha_model, magabs={ "x1": "@x1", "c": "@c", "mabs": mabs_, "sigmaint": sigma_int_, "alpha_low": alpha_low_, "alpha_high": alpha_high_, "beta": beta_, "gamma": gamma_, "x1ref": x1_ref_ }, magobs={ 'cosmology': cosmo } ) # if lightcurve==True: # dset = skysurvey.dataset.DataSet.from_targets_and_survey(snia, survey, phase_range=[-20, 60]) # ndetection = dset.get_ndetection() # detected_indexes = ndetection[ndetection >= 7].index # detected_sne_data = dset.data.loc[detected_indexes] # detected_sne_data['zpsys'] = 'ab' # grouped = detected_sne_data.groupby(level=0) # results = {} # fitted_models = {} # # Loop over each supernova # for sn_id, sn_data in grouped: # print(f'sn_id : {sn_id}', end='\r') # # Rename columns to match sncosmo's expectations # sncosmo_data = sn_data.rename(columns={ # 'mjd': 'time', # 'flux': 'flux', # 'fluxerr': 'fluxerr', # 'zp': 'zp', # 'band': 'band' # }) # # sncosmo_data['zpsys'] = 'ab' # # Convert to astropy.table.Table # sncosmo_table = Table.from_pandas(sncosmo_data) # row = data.iloc[sn_id] # model = sncosmo.Model(source='salt2') # # Set the model redshift to the known value # model.set(z=row['z']) # fix redshift from your row datafra # # Fit the light curve # try: # result, fitted_model = sncosmo.fit_lc( # sncosmo_table, # model, # vparam_names=['t0', 'x0', 'x1', 'c'], # guess_z=False, # minsnr=5.0 # ) # results[sn_id] = result # fitted_models[sn_id] = fitted_model # except Exception as e: # print(f"Failed to fit supernova {sn_id}: {e}") # results[sn_id] = None # Apply noise if errormodel is None: df = snia.data else: noisy_snia = snia.apply_gaussian_noise(errormodel) df = noisy_snia.data # Apply malmquist bias (selection) if survey-specific parameters are provided if cut_loc is not None and cut_scale is not None: mag = np.asarray(df["magobs"], dtype=np.float32) # Detection probability p_detect = 1.0 - expit((mag - cut_loc) * cut_scale) # Bernoulli draw handled by numpy mask = np.random.binomial(1, p_detect).astype(bool) df = df.loc[mask].reset_index(drop=True) # Collect columns data_dict = {col: list(df[col]) for col in cols if col in df} return data_dict