try:
from mpi4py import MPI
MPI_RANK = MPI.COMM_WORLD.Get_rank()
except:
MPI_RANK = 0
from builtins import *
import sys
import os
import warnings
from math import ceil
from multiprocessing import cpu_count
import itertools as itr
import numpy as np
from scipy.integrate import simps
import nestle
import emcee
from tqdm import tqdm
import dill
import blendz
from blendz import Configuration
from blendz.fluxes import Responses
from blendz.photometry import Photometry, SimulatedPhotometry
from blendz.utilities import incrementCount, Silence
try:
import pymultinest
PYMULTINEST_AVAILABLE = True
except ImportError:
PYMULTINEST_AVAILABLE = False
warnings.warn('PyMultinest not installed, so falling back to (slower) python implementation.'
+ ' See http://johannesbuchner.github.com/PyMultiNest/install.html for installation help.')
except (SystemExit, OSError):
PYMULTINEST_AVAILABLE = False
warnings.warn('PyMultinest failed to load, so falling back to (slower) python implementation.'
+ ' See http://johannesbuchner.github.com/PyMultiNest/install.html for installation help.')
[docs]class Photoz(object):
def __init__(self, model=None, photometry=None, config=None,\
load_state_path=None, **kwargs):
if load_state_path is not None:
self.loadState(load_state_path)
else:
#Warn user is config and either/or responses/photometry given that config ignored
if ((model is not None and config is not None) or
(photometry is not None and config is not None)):
warnings.warn('A configuration object was provided to Photoz object '
+ 'as well as a Model/Photometry object, though these '
+ 'should be mutually exclusive. The configuration '
+ 'provided will be ignored.')
#Responses and photometry given, merge their configs
if (model is not None) and (photometry is not None):
self.config = Configuration(**kwargs)
self.config.mergeFromOther(model.config)
self.config.mergeFromOther(photometry.config)
self.model = model
self.responses = self.model.responses
self.photometry = photometry
#Only responses given, use its config to load photometry
elif (model is not None) and (photometry is None):
self.config = Configuration(**kwargs)
self.config.mergeFromOther(model.config)
self.model = model
self.responses = self.model.responses
self.photometry = Photometry(config=self.config)
#Only photometry given, use its config to load responses
elif (model is None) and (photometry is not None):
self.config = Configuration(**kwargs)
self.config.mergeFromOther(photometry.config)
self.photometry = photometry
self.model = blendz.model.BPZ(config=self.config)
self.responses = self.model.responses
#Neither given, load both from provided (or default, if None) config
else:
self.config = Configuration(**kwargs)
if config is not None:
self.config.mergeFromOther(config)
self.photometry = Photometry(config=self.config)
self.model = blendz.model.BPZ(config=self.config)
self.responses = self.model.responses
# Get max_ref_mag_hi from photometry, which already deals with sigma vs fixed
self.max_ref_mag_hi = np.max([g.ref_mag_hi for g in self.photometry])
self.model.max_ref_mag_hi = self.max_ref_mag_hi
self.num_templates = self.responses.templates.num_templates
self.num_measurements = self.responses.filters.num_filters
self.num_galaxies = self.photometry.num_galaxies
self.tmp_ind_to_type_ind = self.responses.templates.tmp_ind_to_type_ind
self.possible_types = self.responses.templates.possible_types
self.num_types = len(self.possible_types)
#Default to assuming single component, present in all measurements
self.model._setMeasurementComponentMapping(None, 1)
#Set up empty dictionaries to put results into
self._samples = {}
self._logevd = {}
self._logevd_error = {}
for g in range(self.num_galaxies):
#Each value is a dictionary which will be filled by sample function
#The keys of this inner dictionary will be the number of blends for run
self._samples[g] = {}
self._logevd[g] = {}
self._logevd_error[g] = {}
[docs] def saveState(self, filepath):
"""Save this entire Photoz instance to file.
This saves the exact state of the current object, including all data and any
reults from sampling.
Args:
filepath (str): Path to file to save to.
"""
if isinstance(self.photometry, SimulatedPhotometry):
try:
current_seed = self.photometry.sim_seed.next()
self.photometry.sim_seed = current_seed
except:
warnings.warn('SimulatedPhotometry seed not saved.')
with open(filepath, 'wb') as f:
state = {key: val for key, val in self.__dict__.items() if key!='pbar'}
dill.dump(state, f)
#Put the random seed back how it was after the saving is done
if isinstance(self.photometry, SimulatedPhotometry):
try:
self.photometry.sim_seed = incrementCount(current_seed)
except:
pass
def loadState(self, filepath):
with open(filepath, 'rb') as f:
self.__dict__.update(dill.load(f))
#If the photometry is simulated, replace the seed currently saved as
#a number with the generator it was before saving
if isinstance(self.photometry, SimulatedPhotometry):
try:
current_seed = self.photometry.sim_seed
self.photometry.sim_seed = incrementCount(current_seed)
except:
warnings.warn('SimulatedPhotometry seed not loaded.')
def _lnLikelihood_flux(self, model_flux):
chi_sq = -1. * np.sum((self.photometry.current_galaxy.flux_data_noRef - model_flux)**2 / self.photometry.current_galaxy.flux_sigma_noRef**2)
return chi_sq
def _lnLikelihood_mag(self, total_ref_flux):
#chi_sq = -1. * np.sum((self.photometry.current_galaxy.ref_mag_data - total_ref_mag)**2 / self.photometry.current_galaxy.ref_mag_sigma**2)
chi_sq = -1. * np.sum((self.photometry.current_galaxy.ref_flux_data - total_ref_flux)**2 / self.photometry.current_galaxy.ref_flux_sigma**2)
return chi_sq
def _lnPosterior(self, params):
num_components = int(len(params) // 2)
redshifts = params[:num_components]
magnitudes = params[num_components:]
if not self.model._obeyPriorConditions(redshifts, magnitudes, self.photometry.current_galaxy.ref_mag_hi):
return -np.inf
else:
#Precalculate all quantities we'll need in the template loop
#Single interp call -> Shape = (N_template, N_band, N_component)
model_fluxes = self.responses.interp(redshifts)
priors = np.zeros((num_components, self.num_types))
for nb in range(num_components):
priors[nb, :] = self.model.lnPrior(redshifts[nb], magnitudes[nb])
redshift_correlation = np.log(1. + self.model.correlationFunction(redshifts))
#Get total flux in reference band = transform to flux & sum
total_ref_flux = np.sum(10.**(-0.4 * magnitudes))
selection_effect = self.model.lnSelection(total_ref_flux,
self.photometry.current_galaxy)
#Loop over all templates - discrete marginalisation
#All log probabilities so (multiply -> add) and (add -> logaddexp)
lnProb = -np.inf
#At each iteration template_combo is a tuple of (T_1, T_2... T_num_components)
for template_combo in itr.product(*itr.repeat(range(self.num_templates), num_components)):
#One redshift/template/magnitude prior and model flux for each blend component
tmp = 0.
blend_flux = np.zeros(self.num_measurements)
component_scaling_norm = 0.
for nb in range(num_components):
T = template_combo[nb]
component_scaling = 10.**(-0.4*magnitudes[nb]) / model_fluxes[T, self.config.ref_band, nb]
blend_flux += model_fluxes[T, :, nb] * component_scaling * self.model.measurement_component_mapping[nb, :]
type_ind = self.tmp_ind_to_type_ind[T]
tmp += priors[nb, type_ind]
#Remove ref_band from blend_fluxes, as that goes into the magnitude
#likelihood, not the flux likelihood
blend_flux = blend_flux[self.config.non_ref_bands]
#Other terms only appear once per summation-step
tmp += redshift_correlation
tmp += self._lnLikelihood_flux(blend_flux)
tmp += self._lnLikelihood_mag(total_ref_flux)
tmp += selection_effect
#logaddexp contribution from this template to marginalise
lnProb = np.logaddexp(lnProb, tmp)
return lnProb - self.prior_norm
def _priorTransform(self, params):
'''
Transform params from [0, 1] uniform random to [min, max] uniform random,
where the min redshift is zero, max redshift is set in the configuration,
and the min/max magnitudes (numerically, not brightness) are set by configuration.
'''
num_components = int(len(params) // 2)
trans = np.ones(len(params))
trans[:num_components] = self.config.z_hi - self.config.z_lo
trans[num_components:] = self.photometry.current_galaxy.ref_mag_hi - self.config.ref_mag_lo
shift = np.zeros(len(params))
shift[:num_components] = self.config.z_lo
shift[num_components:] = self.config.ref_mag_lo
return (params * trans) + shift
def _priorTransform_multinest(self, cube, ndim, nparams):
'''
Transform params from [0, 1] uniform random to [0, max] uniform random,
where the max redshift is set in the configuration, and the max fraction
is 1.
'''
num_components = ndim // 2
trans = np.zeros(ndim)
trans[:num_components] = self.config.z_hi
trans[num_components:] = self.photometry.current_galaxy.ref_mag_hi - self.config.ref_mag_lo
shift = np.zeros(ndim)
shift[:num_components] = self.config.z_lo
shift[num_components:] = self.config.ref_mag_lo
for i in range(ndim):
cube[i] = (cube[i] * trans[i]) + shift[i]
def _lnPosterior_multinest(self, cube, ndim, nparams):
self.num_posterior_evals += 1
params = np.array([cube[i] for i in range(ndim)])
with self.breakSilence():
if (self.num_posterior_evals%self.num_between_print==0) and MPI_RANK==0:
self.pbar.set_description('[Cmp: {}/{}, Itr: {}] '.format(self.blend_count,
self.num_components_sampling,
self.num_posterior_evals))
self.pbar.refresh()
return self._lnPosterior(params)
def _sampleProgressUpdate(self, info):
if (info['it']%self.num_between_print==0) and MPI_RANK==0:
self.pbar.set_description('[Cmp: {}/{}, Itr: {}] '.format(self.blend_count,
self.num_components_sampling,
info['it']))
self.pbar.refresh()
def normalise_prior_setup(self, galind, num_components, magnitude_grid_len=25,
redshift_grid_len=10):
# Set up 1D grids
self.magnitude_grid_len = magnitude_grid_len
self.redshift_grid_len = redshift_grid_len
self.redshift_grid = np.linspace(self.config.z_lo, self.config.z_hi,
self.redshift_grid_len)
ref_mag_hi = self.photometry[galind].ref_mag_hi
self.magnitude_grid = np.linspace(self.config.ref_mag_lo, ref_mag_hi,
self.magnitude_grid_len)
# Create 3D grid for single-component prior - redshift x magnitude x template
prior_grid_type = np.zeros((self.redshift_grid_len, self.magnitude_grid_len, self.num_types))
for i, z in enumerate(self.redshift_grid):
for j, m in enumerate(self.magnitude_grid):
prior_grid_type[i, j, :] = np.exp(self.model.lnPrior(z, m))
self.prior_grid_tmp = np.zeros((self.redshift_grid_len, self.magnitude_grid_len, self.num_templates))
for tmp in range(self.num_templates):
t = self.tmp_ind_to_type_ind[tmp]
self.prior_grid_tmp[:, :, tmp] = prior_grid_type[:, :, t]
def normalise_prior(self, galind, num_components):
# This could/should be replaced with code to cache prior grid
self.normalise_prior_setup(galind, num_components)
# Create N-D grid of 1 + xi([za, zb...])
xi_grid = np.zeros(tuple(self.redshift_grid_len for
i in range(num_components)))
for inds in itr.product(*itr.repeat(range(self.redshift_grid_len), num_components)):
try:
xi_grid[inds] = 1. + self.model.correlationFunction(
self.redshift_grid[np.array(inds)])
except ZeroDivisionError:
xi_grid[inds] = 1.
# Create N-D grid of S([ma, mb...])
select_grid = np.zeros(tuple(self.magnitude_grid_len for
i in range(num_components)))
for inds in itr.product(*itr.repeat(range(self.magnitude_grid_len), num_components)):
total_ref_flux = np.sum(10.**(-0.4*self.magnitude_grid[np.array(inds)]))
select_grid[inds] = np.exp(self.model.lnSelection(total_ref_flux,
self.photometry[galind]))
# Combine N 2D prior grids, N-D xi grid and N-D select grid into one 2N-D grid
# Axes = za, zb, ... ma, mb ...
total_shape = tuple(element for tpl in
tuple((self.redshift_grid_len, self.magnitude_grid_len) for i in range(num_components))
for element in tpl)
self.total_prior_grid = np.zeros(total_shape)
for zinds in itr.product(*itr.repeat(range(self.redshift_grid_len), num_components)):
for minds in itr.product(*itr.repeat(range(self.magnitude_grid_len), num_components)):
okayz = (self.config.sort_redshifts and tuple(sorted(zinds))==zinds)
okaym = (not self.config.sort_redshifts) and tuple(sorted(minds))==minds
if okayz or okaym:
prior_prod = np.ones(self.num_templates)
for cmp in range(num_components):
prior_prod *= self.prior_grid_tmp[zinds[cmp], minds[cmp], :]
prior_prod = np.sum(prior_prod)
inds = tuple(element for tpl in
tuple((zinds[i], minds[i]) for i in range(num_components))
for element in tpl)
self.total_prior_grid[inds] = prior_prod * select_grid[minds] * xi_grid[zinds]
else:
self.total_prior_grid[inds] = 0.
self.prior_norm = simps(simps(self.total_prior_grid, x=self.magnitude_grid), x=self.redshift_grid)
for cmp in range(num_components-1):
self.prior_norm = simps(simps(self.prior_norm , x=self.magnitude_grid), x=self.redshift_grid)
self.prior_norm = np.log(self.prior_norm)
[docs] def sample(self, num_components, galaxy=None, nresample=1000, seed=False,
measurement_component_mapping=None, npoints=150, print_interval=10,
use_pymultinest=None, save_path=None, save_interval=None):
"""Sample the posterior for a particular number of components.
Args:
num_components (int):
Sample the posterior defined for this number of components in the source.
galaxy (int or None):
Index of the galaxy to sample. If None, sample every galaxy in the
photometry. Defaults to None.
nresample (int):
Number of non-weighted samples to draw from the weighted samples
distribution from Nested Sampling. Defaults to 1000.
seed (bool or int):
Random seed for sampling to ensure deterministic results when
ampling again. If False, do not seed. If True, seed with value
derived from galaxy index. If int, seed with specific value.
measurement_component_mapping (None or list of tuples):
If None, sample from the fully blended posterior. For a partially
blended posterior, this should be a list of tuples (length = number of
measurements), where each tuples contains the (zero-based) indices of
the components that measurement contains. Defaults to None.
npoints (int):
Number of live points for the Nested Sampling algorithm. Defaults to 150.
print_interval (int):
Update the progress bar with number of posterior evaluations every
print_interval calls. Defaults to 10.
save_path (None or str):
Filepath for saving the Photoz object for reloading with `Photoz.loadState`.
If None, do not automatically save. If given, the Photoz object will
be saved to this path after all galaxies are sampled. If save_interval
is also not None, the Photoz object will be saved to this path every
save_interval galaxies. Defaults to None.
save_interval (None or int)
If given and save_path is not None, the Photoz object will be
saved to save_path every save_interval galaxies. Defaults to None.
use_pymultinest (bool or None)
If True, sample using the pyMultinest sampler. This requires PyMultiNest
to be installed separately. If False, sample using the Nestle sampler,
which is always installed when blendz is. If None, check whether pyMultinest
is installed and use it if it is, otherwise use Nestle. Defaults to None.
"""
if use_pymultinest is None:
use_pymultinest = PYMULTINEST_AVAILABLE
if isinstance(num_components, int):
num_components = [num_components]
else:
if measurement_component_mapping is not None:
#TODO: This is a time-saving hack to avoid dealing with multiple specifications
#The solution would probably be to rethink the overall design
raise ValueError('measurement_component_mapping cannot be set when sampling multiple numbers of components in one call. Do the separate cases separately.')
self.num_components_sampling = len(num_components)
self.num_between_print = float(round(print_interval))
if galaxy is None:
start = None
stop = None
self.num_galaxies_sampling = self.num_galaxies
elif isinstance(galaxy, int):
start = galaxy
stop = galaxy + 1
self.num_galaxies_sampling = 1
else:
raise TypeError('galaxy may be either None or an integer, but got {} instead'.format(type(galaxy)))
with tqdm(total=self.num_galaxies_sampling, unit='galaxy') as self.pbar:
self.gal_count = 1
for gal in self.photometry.iterate(start, stop):
self.blend_count = 1
for nb in num_components:
if seed is False:
rstate = np.random.RandomState()
elif seed is True:
rstate = np.random.RandomState(gal.index)
else:
rstate = np.random.RandomState(seed + gal.index)
num_param = 2 * nb
self.model._setMeasurementComponentMapping(measurement_component_mapping, nb)
self.normalise_prior(gal.index, nb)
if use_pymultinest:
if not os.path.exists('chains'):
os.makedirs('chains')
with Silence() as self.breakSilence:
self.num_posterior_evals = 0
pymultinest.run(self._lnPosterior_multinest, self._priorTransform_multinest,
num_param, resume=False, verbose=False, sampling_efficiency='model',
n_live_points=npoints)#,
#outputfiles_basename=os.path.join(blendz.CHAIN_PATH, 'chain_'))
results = pymultinest.analyse.Analyzer(num_param)#, outputfiles_basename=os.path.join(blendz.CHAIN_PATH, 'chain_'))
self._samples[gal.index][nb] = results.get_equal_weighted_posterior()[:, :-1]
self._logevd[gal.index][nb] = results.get_mode_stats()['global evidence']
self._logevd_error[gal.index][nb] = results.get_mode_stats()['global evidence error']
else:
results = nestle.sample(self._lnPosterior, self._priorTransform,
num_param, method='multi', npoints=npoints,
rstate=rstate, callback=self._sampleProgressUpdate)
self._samples[gal.index][nb] = results.samples[rstate.choice(len(results.weights), size=nresample, p=results.weights)]
self._logevd[gal.index][nb] = results.logz
self._logevd_error[gal.index][nb] = results.logzerr
self.blend_count += 1
self.gal_count += 1
if MPI_RANK==0:
self.pbar.update()
if (save_path is not None) and (save_interval is not None):
if gal.index % save_interval == 0:
self.saveState(save_path)
if save_path is not None:
self.saveState(save_path)
def _cacheTruthLikelihood(self):
self.model._setMeasurementComponentMapping(None, 1)
#Single interp call -> Shape = (N_template, N_band, N_component)
fixed_model_fluxes = {}
fixed_lnLikelihood_flux = np.zeros((self.photometry.num_galaxies,
self.responses.templates.num_templates))
for g in self.photometry:
fixed_model_fluxes[g.index] = self.responses.interp(np.array([g.truth[0]['redshift']]))
for T in range(self.responses.templates.num_templates):
scaling = 10.**(-0.4*g.ref_mag_data) / fixed_model_fluxes[g.index][T, self.config.ref_band, 0]
fixed_model_fluxes[g.index][T, :, 0] *= scaling
#Cache the flux likelihoods
non_ref_flux = fixed_model_fluxes[g.index][T, self.config.non_ref_bands, 0]
fixed_lnLikelihood_flux[g.index, T] = self._lnLikelihood_flux(non_ref_flux)
return fixed_lnLikelihood_flux
def calibrate(self, **kwargs):
cached_likelihood = self._cacheTruthLikelihood()
self.model.calibrate(self.photometry, cached_likelihood, **kwargs)
[docs] def chain(self, num_components, galaxy=None):
"""Return the (unweighted) posterior samples.
Args:
num_components (int):
Number of components.
galaxy (int or None):
Index of the galaxy to calculate log-evidence for. If None, return array
of log-evidence for every galaxy. Defaults to None.
"""
if galaxy is None:
return [self._samples[g][num_components] for g in range(self.num_galaxies)]
else:
return self._samples[galaxy][num_components]
[docs] def logevd(self, num_components, galaxy=None, return_error=False):
"""Return the natural log of the evidence.
Args:
num_components (int):
Number of components.
galaxy (int or None):
Index of the galaxy to calculate log-evidence for. If None, return array
of log-evidence for every galaxy. Defaults to None.
return_error (bool):
If True, also return the error on the log-evidence. If galaxy is None, this
is also an array. Defaults to False.
"""
if galaxy is None:
if return_error:
return np.array([self._logevd[g][num_components] for g in range(self.num_galaxies)]),\
np.array([self._logevd_error[g][num_components] for g in range(self.num_galaxies)])
else:
return np.array([self._logevd[g][num_components] for g in range(self.num_galaxies)])
else:
if return_error:
return self._logevd[galaxy][num_components], self._logevd_error[galaxy][num_components]
else:
return self._logevd[galaxy][num_components]
[docs] def logbayes(self, m, n, base=None, galaxy=None):
"""Return the log of the Bayes factor between m and n components, log[B_mn].
A positive value suggests that that evidence prefers the m-component model over
the n-component model.
Args:
m (int):
First number of components.
n (int):
Second number of components.
base (float or None):
Base of the log to return. If None, uses natural log.
Defaults to None.
galaxy (int or None):
Index of the galaxy to calculate B_mn for. If None, return array
of B_mn for every galaxy. Defaults to None.
"""
if base is None:
alter_base = 1.
else:
alter_base = np.log(base)
return (self.logevd(m, galaxy=galaxy) - self.logevd(n, galaxy=galaxy)) / alter_base
[docs] def applyToMarginals(self, func, num_components, galaxy=None, **kwargs):
"""Apply a function to the 1D marginal distribution samples of each parameter.
Args:
func (function):
The function to apply to the marginal distribution samples.
It should accept an array of the samples as its first argument,
with optional keyword arguments.
num_components (int):
Number of components.
galaxy (int or None):
Index of the galaxy to apply the function to. If None, return
array with a row for each galaxy. Defaults to None.
**kwargs:
Any optional keyword arguments to pass to the function.
"""
if galaxy is None:
out = np.zeros((self.num_galaxies, num_components * 2))
for g in range(self.num_galaxies):
for n in range(num_components * 2):
out[g, n] = func(self._samples[g][num_components][:, n], **kwargs)
else:
out = np.zeros(num_components * 2)
for n in range(num_components * 2):
out[n] = func(self._samples[galaxy][num_components][:, n], **kwargs)
return out
def _MAP1d(self, samps, bins=50):
vals, edges = np.histogram(samps, bins=bins)
return edges[np.argmax(vals)] + ((edges[1] - edges[0]) / 2.)
[docs] def max(self, num_components, galaxy=None, bins=20):
"""Return the maximum-a-posteriori point for the 1D marginal distribution of each parameter.
This is calculated by forming a 1D histogram of each marginal distribution
and assigning the MAP of that parameter as the centre of the tallest bin.
Args:
num_components (int):
Number of components.
galaxy (int or None):
Index of the galaxy to calculate the MAP for. If None, return array
with rows of MAPs for each galaxy. Defaults to None.
bins (int):
Number of bins to use for each 1D histogram.
"""
return self.applyToMarginals(self._MAP1d, num_components, galaxy=galaxy, bins=bins)
[docs] def mean(self, num_components, galaxy=None):
"""Return the mean point for the 1D marginal distribution of each parameter.
Args:
num_components (int):
Number of components.
galaxy (int or None):
Index of the galaxy to calculate the MAP for. If None, return array
with rows of means for each galaxy. Defaults to None.
"""
return self.applyToMarginals(np.mean, num_components, galaxy=galaxy)
[docs] def std(self, num_components, galaxy=None):
"""Return the standard deviation for the 1D marginal distribution of each parameter.
Args:
num_components (int):
Number of components.
galaxy (int or None):
Index of the galaxy to calculate the MAP for. If None, return array
with rows of means for each galaxy. Defaults to None.
"""
return self.applyToMarginals(np.std, num_components, galaxy=galaxy)
def quantiles(self, num_components, galaxy=None, q=(0.16, 0.84)):
try:
pcnt = [qq*100. for qq in q]
except TypeError:
pcnt = [100. * q]
#call numpy.percentile with q *= 100
return [self.applyToMarginals(np.percentile, num_components, galaxy=galaxy, q=pp) for pp in pcnt]