"""
Classes and methods needed to do hypersurface interpolation over arbitrary parameters.
"""
__all__ = ['HypersurfaceInterpolator', 'run_interpolated_fit', 'prepare_interpolated_fit',
'assemble_interpolated_fits', 'load_interpolated_hypersurfaces', 'pipeline_cfg_from_states',
'serialize_pipeline_cfg', 'get_incomplete_job_idx']
__author__ = 'T. Stuttard, A. Trettin'
__license__ = '''Copyright (c) 2014-2017, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
import os
import collections
import copy
import numpy as np
from scipy import interpolate
from .hypersurface import Hypersurface, HypersurfaceParam, load_hypersurfaces
from pisa import FTYPE, ureg
from pisa.utils import matrix
from pisa.utils.jsons import from_json, to_json
from pisa.utils.fileio import from_file, to_file
from pisa.core.pipeline import Pipeline
from pisa.core.binning import MultiDimBinning, is_binning
from pisa.core.map import Map
from pisa.core.param import Param, ParamSet
from pisa.utils.resources import find_resource
from pisa.utils.fileio import mkdir
from pisa.utils.log import logging, set_verbosity
from pisa.utils.comparisons import ALLCLOSE_KW
from uncertainties import ufloat, correlated_values
from uncertainties import unumpy as unp
[docs]
class HypersurfaceInterpolator(object):
"""Factory for interpolated hypersurfaces.
After being initialized with a set of hypersurface fits produced at different
parameters, it uses interpolation to produce a Hypersurface object
at a given point in parameter space using scipy's `RegularGridInterpolator`.
The interpolation is piecewise-linear between points. All points must lie on a
rectilinear ND grid.
Parameters
----------
interpolation_param_spec : dict
Specification of interpolation parameter grid of the form::
interpolation_param_spec = {
'param1': {"values": [val1_1, val1_2, ...], "scales_log": True/False}
'param2': {"values": [val2_1, val2_2, ...], "scales_log": True/False}
...
'paramN': {"values": [valN_1, valN_2, ...], "scales_log": True/False}
}
where values are given as :obj:`Quantity`.
hs_fits : list of dict
list of dicts with hypersurfacesthat were fit at the points of the parameter mesh
defined by interpolation_param_spec
ignore_nan : bool
Ignore empty bins in hypersurfaces. The intercept in those bins is set to 1 and
all slopes are set to 0.
Notes
-----
Be sure to give a support that covers the entire relevant parameter range and a
good distance beyond! To prevent minimization failure from NaNs, extrapolation
is used if hypersurfaces outside the support are requested but needless to say
these numbers are unreliable.
See Also
--------
scipy.interpolate.RegularGridInterpolator :
class used for interpolation
"""
def __init__(self, interpolation_param_spec, hs_fits, ignore_nan=True):
self.ndim = len(interpolation_param_spec.keys())
# key ordering is important to guarantee that dimensions stay consistent
msg = "interpolation params must be specified as a dict with ordered keys"
assert isinstance(interpolation_param_spec, collections.OrderedDict), msg
for k, v in interpolation_param_spec.items():
assert set(v.keys()) == {"values", "scales_log"}
assert isinstance(v["values"], collections.abc.Sequence)
self.interp_param_spec = interpolation_param_spec
reference_hs = hs_fits[0]["hs_fit"]
# we are going to produce the hypersurface from a state that is the same
# as the reference, only the coefficients and covariance matrices are
# injected from the interpolation.
self._reference_state = copy.deepcopy(reference_hs.serializable_state)
# for cleanliness we wipe numbers from the original state
self._reference_state["intercept_sigma"] = np.nan
self._reference_state["fit_maps_norm"] = None
self._reference_state["fit_maps_raw"] = None
self._reference_state["fit_chi2"] = np.nan
for param in self._reference_state['params'].values():
param['fit_coeffts_sigma'] = np.full_like(
param['fit_coeffts_sigma'], np.nan)
# Instead of holding numbers, these coefficients and covariance matrices are
# interpolator objects the produce them at the requested point.
# The shape of fit_coeffts is [binning ..., fit coeffts]
self.coeff_shape = reference_hs.fit_coeffts.shape
self.coefficients = None
# The shape of fit_cov_mat is [binning ..., fit coeffts, fit coeffts]
self.covars_shape = reference_hs.fit_cov_mat.shape
self.covars = None
# We now need to massage the fit coefficients into the correct shape
# for interpolation.
# The dimensions of the interpolation parameters come first, the dimensions
# of the hypersurface coefficients comes last.
self.interp_shape = tuple(len(v["values"]) for v in self.interp_param_spec.values())
# dimension is [interp_shape, binning..., fit coeffts]
self._coeff_z = np.zeros(self.interp_shape + self.coeff_shape)
# dimension is [interp_shape, binning..., fit coeffts, fit coeffts]
self._covar_z = np.zeros(self.interp_shape + self.covars_shape)
# Here we use the same indexing as below in `fit_hypersurfaces`
for i, idx in enumerate(np.ndindex(self.interp_shape)):
# As an additional safety measure, we check that the parameters are what
# we expect to find at this index.
expected_params = dict(
(n, self.interp_param_spec[n]["values"][idx[j]])
for j, n in enumerate(self.interp_param_spec.keys())
)
param_values = hs_fits[i]["param_values"]
msg = ("The stored values where hypersurfaces were fit do not match those"
"in the interpolation grid.")
assert np.all([expected_params[n].m == param_values[n].m
for n in self.interp_param_spec.keys()]), msg
self._coeff_z[idx] = hs_fits[i]["hs_fit"].fit_coeffts
self._covar_z[idx] = hs_fits[i]["hs_fit"].fit_cov_mat
grid_coords = list(
np.array([val.m for val in val_list["values"]])
for val_list in self.interp_param_spec.values()
)
self.param_bounds = [(np.min(grid_vals), np.max(grid_vals))
for grid_vals in grid_coords]
# If a parameter scales as log, we give the log of the parameter to the
# interpolator. We must not forget to do this again when we call the
# interpolator later!
for i, param_name in enumerate(self.interpolation_param_names):
if self.interp_param_spec[param_name]["scales_log"]:
grid_coords[i] = np.log10(grid_coords[i])
self.coefficients = interpolate.RegularGridInterpolator(
grid_coords,
self._coeff_z,
# We disable extrapolation, but clip parameter values inside the valid
# range.
bounds_error=True, fill_value=None
)
self.covars = interpolate.RegularGridInterpolator(
grid_coords,
self._covar_z,
bounds_error=True, fill_value=None
)
# In order not to spam warnings, we only want to warn about non positive
# semi definite covariance matrices once for each bin. We store the bin
# indeces for which the warning has already been issued.
self.covar_bins_warning_issued = []
self.ignore_nan = ignore_nan
@property
def interpolation_param_names(self):
return list(self.interp_param_spec.keys())
@property
def param_names(self):
return list(self._reference_state["params"].keys())
@property
def binning(self):
binning = self._reference_state["binning"]
if not is_binning(binning) :
binning = MultiDimBinning(**binning)
return binning
@property
def num_interp_params(self):
return len(self.interp_param_spec.keys())
[docs]
def get_hypersurface(self, **param_kw):
"""
Get a Hypersurface object with interpolated coefficients.
Parameters
----------
**param_kw
Parameters are given as keyword arguments, where the names
of the arguments must match the names of the parameters over
which the hypersurfaces are interpolated. The values
are given as :obj:`Quantity` objects with units.
"""
assert set(param_kw.keys()) == set(self.interp_param_spec.keys()), "invalid parameters"
# getting param magnitudes in the same units as the parameter specification
x = np.array([
param_kw[p].m_as(self.interp_param_spec[p]["values"][0].u)
# we have checked that this is an OrderedDict so that the order of x is not
# ambiguous here
for p in self.interp_param_spec.keys()
])
assert len(x) == len(self.param_bounds)
for i, bounds in enumerate(self.param_bounds):
x[i] = np.clip(x[i], *bounds)
# if a parameter scales as log, we have to take the log here again
for i, param_name in enumerate(self.interpolation_param_names):
if self.interp_param_spec[param_name]["scales_log"]:
# We must be strict with raising errors here, because otherwise
# the Hypersurface will suddenly have NaNs everywhere! This shouldn't
# happen because we clip values into the valid parameter range.
if x[i] <= 0:
raise RuntimeError("A log-scaling parameter cannot become zero "
"or negative!")
x[i] = np.log10(x[i])
state = copy.deepcopy(self._reference_state)
# fit covariance matrices are stored directly in the state while fit coeffts
# must be assigned with the setter method...
# need squeeze here because the RegularGridInterpolator always puts another
# dimension around the output
state["fit_cov_mat"] = np.squeeze(self.covars(x))
assert state["fit_cov_mat"].shape == self.covars_shape
for idx in np.ndindex(state['fit_cov_mat'].shape):
if self.ignore_nan: continue
assert np.isfinite(state['fit_cov_mat'][idx]), ("invalid cov matrix "
f"element encountered at {param_kw} in loc {idx}")
# check covariance matrices for symmetry, positive semi-definiteness
for bin_idx in np.ndindex(state['fit_cov_mat'].shape[:-2]):
m = state['fit_cov_mat'][bin_idx]
if self.ignore_nan and np.any(~np.isfinite(m)):
state['fit_cov_mat'][bin_idx] = np.identity(m.shape[0])
m = state['fit_cov_mat'][bin_idx]
assert np.allclose(
m, m.T, rtol=ALLCLOSE_KW['rtol']*10.), f'cov matrix not symmetric in bin {bin_idx}'
if not matrix.is_psd(m):
state['fit_cov_mat'][bin_idx] = matrix.fronebius_nearest_psd(m)
if not bin_idx in self.covar_bins_warning_issued:
logging.warn(
f'Invalid covariance matrix fixed in bin: {bin_idx}')
self.covar_bins_warning_issued.append(bin_idx)
hypersurface = Hypersurface.from_state(state)
coeffts = np.squeeze(self.coefficients(x)) # calls interpolator
assert coeffts.shape == self.coeff_shape
# check that coefficients exist and if not replace with default values
for idx in np.ndindex(self.coeff_shape):
if self.ignore_nan and ~np.isfinite(coeffts[idx]):
coeffts[idx] = 1 if idx[-1] == 0 else 0 # set intercept to 1, slopes 0
assert np.isfinite(coeffts[idx]), ("invalid coeff encountered at "
f"{param_kw} in loc {idx}")
# the setter method defined in the Hypersurface class takes care of
# putting the coefficients in the right place in their respective parameters
hypersurface.fit_coeffts = coeffts
return hypersurface
def _make_slices(self, *xi):
"""Make slices of hypersurfaces for plotting.
In some covariance matrices, the spline fits are corrected to make
the matrix positive semi-definite. The slices produced by this function
include all of those effects.
Parameters
----------
xi : list of ndarray
Points at which the hypersurfaces are to be evaluated. The length of the
list must equal the number of parameters, each ndarray in the list must have
the same shape (slice_shape).
Returns
-------
coeff_slices : numpy.ndarray
slices in fit coefficients. Size: (binning..., number of coeffs) + slice_shape
covar_slices : numpy.ndarray
slices in covariance matrix elements.
Size: (binning..., number of coeffs, number of coeffs) + slice_shape
"""
slice_shape = xi[0].shape
for x in xi:
assert x.shape == slice_shape
assert len(xi) == self.num_interp_params
coeff_slices = np.zeros(self.coeff_shape + slice_shape)
covar_slices = np.zeros(self.covars_shape + slice_shape)
for idx in np.ndindex(slice_shape):
pars = collections.OrderedDict()
for i, name in enumerate(self.interpolation_param_names):
pars[name] = xi[i][idx]
hs = self.get_hypersurface(**pars)
slice_idx = (Ellipsis,) + idx
coeff_slices[slice_idx] = hs.fit_coeffts
covar_slices[slice_idx] = hs.fit_cov_mat
return coeff_slices, covar_slices
[docs]
def plot_fits_in_bin(self, bin_idx, ax=None, n_steps=20, **param_kw):
"""
Plot the coefficients as well as covariance matrix elements as a function
of the interpolation parameters.
Parameters
----------
bin_idx : tuple
index of the bin for which to plot the fits
ax : 2D array of axes, optional
axes into which to place the plots. If None (default),
appropriate axes will be generated. Must have at least
size (n_coeff, n_coeff + 1).
n_steps : int, optional
number of steps to plot between minimum and maximum
**param_kw :
Parameters to be fixed when producing slices. If the interpolation
is in N-D, then (N-2) parameters need to be fixed to produce 2D plots
of the remaining 2 parameters and (N-1) need to be fixed to produce a
1D slice.
"""
plot_dim = self.ndim - len(param_kw.keys())
assert plot_dim in [1, 2], "plotting only supported in 1D or 2D"
import matplotlib.pyplot as plt
n_coeff = self.coeff_shape[-1]
hs_param_names = list(self._reference_state['params'].keys())
hs_param_labels = ["intercept"] + [f"{p} p{i}" for p in hs_param_names
for i in range(self._reference_state['params'][p]['num_fit_coeffts'])]
fig = None
if ax is None:
fig, ax = plt.subplots(nrows=n_coeff, ncols=n_coeff+1,
squeeze=False, sharex=True,
figsize=(3+(5*(n_coeff+1)), 2+(3*n_coeff)) )
# remember whether the plots need log scale or not, by default not
x_is_log = False
y_is_log = False
# names of the variables we are plotting
plot_names = set(self.interpolation_param_names) - set(param_kw.keys())
if plot_dim == 1:
x_name = list(plot_names)[0]
else:
x_name, y_name = list(plot_names)
# in both 1D and 2D cases, we always plot at least an x-variable
x_unit = self.interp_param_spec[x_name]["values"][0].u
# we need the magnitudes here so that units are unambiguous when we make
# the linspace/geomspace for plotting
x_mags = [v.m_as(x_unit) for v in self.interp_param_spec[x_name]["values"]]
if self.interp_param_spec[x_name]["scales_log"]:
x_plot = np.geomspace(np.min(x_mags), np.max(x_mags), n_steps)
x_is_log = True
else:
x_plot = np.linspace(np.min(x_mags), np.max(x_mags), n_steps)
# we put the unit back later
if plot_dim == 1:
# To make slices, we need to set any variables we do not plot over to the
# value given in param_kw.
slice_args = []
# We need to make sure that we give the values in the correct order!
for n in self.interpolation_param_names:
if n == x_name:
slice_args.append(x_plot * x_unit)
elif n in param_kw.keys():
# again, insure that the same unit is used that went into the
# interpolation
param_unit = self.interp_param_spec[n]["values"][0].u
slice_args.append(
np.full(x_plot.shape, param_kw[n].m_as(param_unit)) * param_unit
)
else:
raise ValueError("parameter neither specified nor plotted")
coeff_slices, covar_slices = self._make_slices(*slice_args)
else:
# if we are in 2D, we need to do the same procedure again for the y-variable
y_unit = self.interp_param_spec[y_name]["values"][0].u
y_mags = [v.m_as(y_unit) for v in self.interp_param_spec[y_name]["values"]]
if self.interp_param_spec[y_name]["scales_log"]:
# we add one step to the size in y so that transposition is unambiguous
y_plot = np.geomspace(np.min(y_mags), np.max(y_mags), n_steps + 1)
y_is_log = True
else:
y_plot = np.linspace(np.min(y_mags), np.max(y_mags), n_steps + 1)
x_mesh, y_mesh = np.meshgrid(x_plot, y_plot)
slice_args = []
for n in self.interpolation_param_names:
if n == x_name:
slice_args.append(x_mesh * x_unit)
elif n == y_name:
slice_args.append(y_mesh * y_unit)
elif n in param_kw.keys():
# again, insure that the same unit is used that went into the
# interpolation
param_unit = self.interp_param_spec[n]["values"][0].u
slice_args.append(
np.full(x_mesh.shape, param_kw[n].m_as(param_unit)) * param_unit
)
else:
raise ValueError("parameter neither specified nor plotted")
coeff_slices, covar_slices = self._make_slices(*slice_args)
# first column plots fit coefficients
for i in range(n_coeff):
z_slice = coeff_slices[bin_idx][i]
if plot_dim == 1:
ax[i, 0].plot(x_plot, z_slice, label='interpolation')
# Plotting the original input points only works if the interpolation
# is in 1D. If we are plotting a 1D slice from a 2D interpolation, this
# does not work.
# The number of fit points is the first dimension in self._coeff_z
if plot_dim == self.ndim:
slice_idx = (Ellipsis,) + bin_idx + (i,)
ax[i, 0].scatter(x_mags, self._coeff_z[slice_idx],
color='k', marker='x', label='fit points')
ax[i, 0].set_ylabel(hs_param_labels[i])
else:
pc = ax[i, 0].pcolormesh(x_mesh, y_mesh, z_slice)
cbar = plt.colorbar(pc, ax=ax[i, 0])
cbar.ax.ticklabel_format(style='sci', scilimits=(0, 0))
ax[i, 0].set_ylabel(y_name)
ax[i, 0].set_xlabel(x_name)
# later column plots the elements of the covariance matrix
for j in range(0, n_coeff):
z_slice = covar_slices[bin_idx][i, j]
if plot_dim == 1:
ax[i, j+1].plot(x_plot, z_slice, label='interpolation')
# Same problem as above, only in 1D case can this be shown
# the number of points is the first dim in self._covar_z
if plot_dim == self.ndim:
coeff_idx = (Ellipsis,) + bin_idx + (i, j)
ax[i, j+1].scatter(x_mags, self._covar_z[coeff_idx],
color='k', marker='x', label='fit points')
else:
pc = ax[i, j+1].pcolormesh(x_mesh, y_mesh, z_slice)
cbar = plt.colorbar(pc, ax=ax[i, j+1])
cbar.ax.ticklabel_format(style='sci', scilimits=(0, 0))
ax[i, j+1].set_ylabel(y_name)
ax[i, j+1].set_xlabel(x_name)
if plot_dim == 1:
# in the 1D case, labels can be placed on the x and y axes
for j in range(n_coeff+1):
ax[-1, j].set_xlabel(x_name)
ax[0, 0].set_title('coefficient')
for j in range(n_coeff):
ax[0, j+1].set_title(f'cov. {hs_param_labels[j]}')
else:
# in the 2D case, we need separate annotations
rows = hs_param_labels
cols = ["coefficient"] + [f"cov. {hl}" for hl in hs_param_labels]
pad = 20
for a, col in zip(ax[0], cols):
a.annotate(col, xy=(0.5, 1), xytext=(0, pad),
xycoords='axes fraction', textcoords='offset points',
size='x-large', ha='center', va='baseline')
for a, row in zip(ax[:, 0], rows):
a.annotate(row, xy=(0, 0.5), xytext=(-a.yaxis.labelpad - pad, 0),
xycoords=a.yaxis.label, textcoords='offset points',
size='x-large', ha='right', va='center')
for i, j in np.ndindex((n_coeff, n_coeff+1)):
if x_is_log: ax[i, j].set_xscale("log")
if y_is_log: ax[i, j].set_yscale("log")
ax[i, j].grid()
if plot_dim == 1:
ax[i, j].legend()
# ax[i, j].relim()
# ax[i, j].autoscale_view()
if not x_is_log:
ax[i, j].ticklabel_format(style='sci', scilimits=(0, 0), axis="x")
if not y_is_log:
ax[i, j].ticklabel_format(style='sci', scilimits=(0, 0), axis="y")
if fig is not None :
fig.tight_layout()
if plot_dim == 2:
fig.subplots_adjust(left=0.15, top=0.95)
return fig
else :
return
[docs]
def pipeline_cfg_from_states(state_dict):
"""Recover a pipeline cfg containing PISA objects from a raw state.
When a pipeline configuration is stored to JSON, the PISA objects turn into
their serialized states. This function looks through the dictionary returned by
`from_json` and recovers the PISA objects such as `ParamSet` and `MultiDimBinning`.
It should really become part of PISA file I/O functionality to read and write
PISA objects inside dictionaries/lists into a JSON and be able to recover
them...
"""
# TODO: Make this a core functionality of PISA
# This is just a mess... some objects have a `from_state` method, some take the
# unpacked state dict as input, some take the state...
pipeline_cfg = collections.OrderedDict()
for stage_key in state_dict.keys():
# need to check all of this manually... no automatic way to do it :(
if stage_key == "pipeline":
pipeline_cfg[stage_key] = copy.deepcopy(state_dict[stage_key])
pipeline_cfg[stage_key]["output_key"] = tuple(
pipeline_cfg[stage_key]["output_key"])
binning_state = pipeline_cfg[stage_key]["output_binning"]
pipeline_cfg[stage_key]["output_binning"] = MultiDimBinning(**binning_state)
continue
# undo what we did in `serialize_pipeline_cfg` by splitting the keys into tuples
tuple_key = tuple(stage_key.split("__"))
pipeline_cfg[tuple_key] = copy.deepcopy(state_dict[stage_key])
for k in ["calc_mode", "apply_mode", "node_mode"]:
if k in pipeline_cfg[tuple_key]:
if isinstance(pipeline_cfg[tuple_key][k], collections.abc.Mapping):
pipeline_cfg[tuple_key][k] = MultiDimBinning(
**pipeline_cfg[tuple_key][k])
if "params" in pipeline_cfg[tuple_key].keys():
pipeline_cfg[tuple_key]["params"] = ParamSet(
pipeline_cfg[tuple_key]["params"])
# if any stage takes any other arguments that we didn't think of here, they
# won't work
return pipeline_cfg
[docs]
def serialize_pipeline_cfg(pipeline_cfg):
"""Turn a pipeline configuration into something we can store to JSON.
It doesn't work by default because tuples are not allowed as keys when storing to
JSON. All we do is to turn the tuples into strings divided by a double underscore.
"""
serializable_state = collections.OrderedDict()
serializable_state["pipeline"] = pipeline_cfg["pipeline"]
for k in pipeline_cfg.keys():
if k == "pipeline": continue
flat_key = "__".join(k)
serializable_state[flat_key] = pipeline_cfg[k]
# this isn't _really_ a serializable state, the objects are still PISA objects...
# bit it will convert correctly when thrown into `to_json`
return serializable_state
[docs]
def assemble_interpolated_fits(fit_directory, output_file, drop_fit_maps=False, leftout_param=None, leftout_surface=None):
"""After all of the fits on the cluster are done, assemble the results to one JSON.
The JSON produced by this function is what `load_interpolated_hypersurfaces`
expects.
"""
assert os.path.isdir(fit_directory), "fit directory does not exist"
metadata = from_json(os.path.join(fit_directory, "metadata.json"))
combined_data = collections.OrderedDict()
combined_data["interpolation_param_spec"] = metadata["interpolation_param_spec"]
# Loop over grid points
hs_fits = []
grid_shape = tuple(metadata["grid_shape"])
for job_idx, grid_idx in enumerate(np.ndindex(grid_shape)):
# Load grid point data
gridpoint_json = os.path.join(fit_directory, f"gridpoint_{job_idx:06d}.json.bz2")
logging.info(f"Reading {gridpoint_json}")
gridpoint_data = from_json(gridpoint_json)
# Check the loaded data
assert job_idx == gridpoint_data["job_idx"]
assert np.all(grid_idx == gridpoint_data["grid_idx"])
# TODO: Offer to run incomplete fits locally
assert gridpoint_data["fit_successful"], f"job no. {job_idx} not finished"
# Drop fit maps if requested (can significantly reduce file size)
if drop_fit_maps :
for key, hs_state in gridpoint_data["hs_fit"].items() :
hs_state["fit_maps_raw"] = None
hs_state["fit_maps_norm"] = None
if leftout_param is not None:
for surface in leftout_surface:
gridpoint_data["hs_fit"][surface]["params"][leftout_param]["fit_coeffts"] *= 0.0
print(gridpoint_data["hs_fit"][surface]["params"][leftout_param]["fit_coeffts"])
# Add grid point data to output file
hs_fits.append(collections.OrderedDict(
param_values=gridpoint_data["param_values"],
hs_fit=gridpoint_data["hs_fit"]
))
# Write the output file
combined_data["hs_fits"] = hs_fits
to_file(combined_data, output_file)
[docs]
def get_incomplete_job_idx(fit_directory):
"""Get job indices of fits that are not flagged as successful."""
assert os.path.isdir(fit_directory), "fit directory does not exist"
metadata = from_json(os.path.join(fit_directory, "metadata.json"))
grid_shape = tuple(metadata["grid_shape"])
failed_idx = []
for job_idx, grid_idx in enumerate(np.ndindex(grid_shape)):
try:
gridpoint_json = os.path.join(fit_directory,
f"gridpoint_{job_idx:06d}.json.bz2")
logging.info(f"Reading {gridpoint_json}")
gridpoint_data = from_json(gridpoint_json)
except:
break
if not gridpoint_data["fit_successful"]:
failed_idx.append(job_idx)
job_idx += 1
return failed_idx
[docs]
def run_interpolated_fit(fit_directory, job_idx, skip_successful=False):
"""Run the hypersurface fit for a grid point.
If `skip_successful` is true, do not run if the `fit_successful` flag is already
True.
"""
#TODO a lot of this is copied from fit_hypersurfaces in hypersurface.py, would be safer to make more OAOO
#TODO Copy the param value storage stuff from fit_hypersurfaces across in the meantime
assert os.path.isdir(fit_directory), "fit directory does not exist"
gridpoint_json = os.path.join(fit_directory, f"gridpoint_{job_idx:06d}.json.bz2")
gridpoint_data = from_json(gridpoint_json)
if skip_successful and gridpoint_data["fit_successful"]:
logging.info(f"Fit at job index {job_idx} already successful, skipping...")
return
metadata = from_json(os.path.join(fit_directory, "metadata.json"))
interpolation_param_spec = metadata["interpolation_param_spec"]
# this is a pipeline configuration in the form of an OrderedDict
nominal_dataset = metadata["nominal_dataset"]
# Why can we still not load PISA objects from JSON that are inside a dict?! Grrr...
nominal_dataset["pipeline_cfg"] = pipeline_cfg_from_states(
nominal_dataset["pipeline_cfg"]
)
# this is a list of pipeline configurations
sys_datasets = metadata["sys_datasets"]
for sys_dataset in sys_datasets:
sys_dataset["pipeline_cfg"] = pipeline_cfg_from_states(
sys_dataset["pipeline_cfg"]
)
# this is a dict of param_name : value pairs
param_values = gridpoint_data["param_values"]
# we do a redundant check to make sure the parameter values at this grid point are
# correct
interpolation_param_names = metadata["interpolation_param_names"]
grid_shape = tuple(metadata["grid_shape"])
# the grid point index of this job
grid_idx = list(np.ndindex(grid_shape))[job_idx]
for i, n in enumerate(interpolation_param_names):
ms = "Inconsistent parameter values at grid point!"
assert interpolation_param_spec[n]["values"][grid_idx[i]] == param_values[n], ms
# now we need to adjust the values of the parameter in all pipelines for this point
logging.info(f"updating pipelines with parameter values: {param_values}")
for dataset in [nominal_dataset] + sys_datasets:
for stage_cfg in dataset["pipeline_cfg"].values():
if "params" not in stage_cfg.keys(): continue
for param in interpolation_param_names:
if param in stage_cfg["params"].names:
stage_cfg["params"][param].value = param_values[param]
# these are the parameters of the hypersurface, NOT the ones we interpolate them
# over!
hypersurface_params = []
for param_state in metadata["hypersurface_params"]:
hypersurface_params.append(HypersurfaceParam.from_state(param_state))
def find_hist_stage(pipeline):
"""Locate the index of the hist stage in a pipeline."""
hist_idx_found = False
for i, s in enumerate(pipeline.stages):
if s.__class__.__name__ == "hist":
hist_idx = i
hist_idx_found = True
break
if not hist_idx_found:
raise RuntimeError("Could not find histogram stage in pipeline, aborting.")
return hist_idx
# We create Pipeline objects, get their outputs and then forget about the Pipeline
# object on purpose! The memory requirement to hold all systematic sets at the same
# time is just too large, especially on the cluster. The way we do it below we
# only need enough memory for one dataset at a time.
for dataset in [nominal_dataset] + sys_datasets:
pipeline = Pipeline(dataset["pipeline_cfg"])
dataset["mapset"] = pipeline.get_outputs()
# get the un-weighted event counts as well so that we can exclude bins
# with too little statistics
# First, find out which stage is the hist stage
hist_idx = find_hist_stage(pipeline)
pipeline.stages[hist_idx].unweighted = True
dataset["mapset_unweighted"] = pipeline.get_outputs()
del pipeline
# Merge maps according to the combine regex, if one was provided
combine_regex = metadata["combine_regex"]
if combine_regex is not None:
for dataset in [nominal_dataset] + sys_datasets:
dataset["mapset"] = dataset["mapset"].combine_re(combine_regex)
dataset["mapset_unweighted"] = dataset["mapset_unweighted"].combine_re(combine_regex)
minimum_mc = metadata["minimum_mc"]
# Remove bins (i.e. set their count to zero) that have too few MC events
for dataset in sys_datasets + [nominal_dataset]:
for map_name in dataset["mapset"].names:
insuff_mc = dataset["mapset_unweighted"][map_name].nominal_values < minimum_mc
# Setting the hist to zero sets both nominal value and std_dev to zero
dataset["mapset"][map_name].hist[insuff_mc] = 0.
hypersurface_fit_kw = metadata["hypersurface_fit_kw"]
hypersurfaces = collections.OrderedDict()
log = metadata["log"] # flag determining whether hs fit is run in log-space or not
for map_name in nominal_dataset["mapset"].names:
nominal_map = nominal_dataset["mapset"][map_name]
nominal_param_values = nominal_dataset["sys_params"]
sys_maps = [sys_dataset["mapset"][map_name] for sys_dataset in sys_datasets]
sys_param_values = [sys_dataset["sys_params"] for sys_dataset in sys_datasets]
hypersurface = Hypersurface(
# Yes, this MUST be a deepcopy! Otherwise weird memory overwrites happen
# and all the numbers get jumbled across the hypersurfaces of different maps
params=copy.deepcopy(hypersurface_params),
initial_intercept=0. if log else 1., # Initial value for intercept
log=log
)
hypersurface.fit(
nominal_map=nominal_map,
nominal_param_values=nominal_param_values,
sys_maps=sys_maps,
sys_param_values=sys_param_values,
norm=True,
# Is the space or loading time really a problem?
# keep_maps=False, # it would take a lot more space otherwise
**hypersurface_fit_kw
)
logging.debug("\nFitted hypersurface report:\n%s" % hypersurface)
hypersurfaces[map_name] = hypersurface
gridpoint_data["hs_fit"] = hypersurfaces
gridpoint_data["fit_successful"] = True
to_json(gridpoint_data, gridpoint_json)
[docs]
def prepare_interpolated_fit(
nominal_dataset, sys_datasets, params, fit_directory, interpolation_param_spec,
combine_regex=None, log=False, minimum_mc=0, **hypersurface_fit_kw
):
'''
Writes steering files for fitting hypersurfaces on a grid of arbitrary parameters.
The fits can then be run on a cluster with `run_interpolated_fit`.
Parameters
----------
nominal_dataset : dict
Definition of the nominal dataset. Specifies the pipleline with which the maps
can be created, and the values of all systematic parameters used to produced the
dataset.
Format must be:
nominal_dataset = {
"pipeline_cfg" = <pipeline cfg file (either cfg file path or dict)>),
"sys_params" = { param_0_name : param_0_value_in_dataset, ..., param_N_name : param_N_value_in_dataset }
}
Sys params must correspond to the provided HypersurfaceParam instances provided
in the `params` arg.
sys_datasets : list of dicts
List of dicts, where each dict defines one of the systematics datasets to be
fitted. The format of each dict is the same as explained for `nominal_dataset`
params : list of HypersurfaceParams
List of HypersurfaceParams instances that define the hypersurface. Note that
this defined ALL hypersurfaces fitted in this function, e.g. only supports a
single parameterisation for all maps (this is almost always what you want).
output_directory : str
Directory in which the fits will be run. Steering files for the fits to be run
will be stored here.
combine_regex : list of str, or None
List of string regex expressions that will be used for merging maps. Used to
combine similar species. Must be something that can be passed to the
`MapSet.combine_re` function (see that functions docs for more details). Choose
`None` is do not want to perform this merging.
interpolation_param_spec : collections.OrderedDict
Specification of parameter grid that hypersurfaces should be interpolated over.
The dict should have the following form::
interpolation_param_spec = {
'param1': {"values": [val1_1, val1_2, ...], "scales_log": True/False}
'param2': {"values": [val2_1, val2_2, ...], "scales_log": True/False}
...
'paramN': {"values": [valN_1, valN_2, ...], "scales_log": True/False}
}
The hypersurfaces will be fit on an N-dimensional rectilinear grid over
parameters 1 to N. The flag `scales_log` indicates that the interpolation over
that parameter should happen in log-space.
minimum_mc : int, optional
Minimum number of un-weighted MC events required in each bin.
hypersurface_fit_kw : kwargs
kwargs will be passed on to the calls to `Hypersurface.fit`
'''
# Take (deep) copies of lists/dicts to avoid modifying the originals
# Useful for cases where this function is called in a loop (e.g. leave-one-out tests)
nominal_dataset = copy.deepcopy(nominal_dataset)
sys_datasets = copy.deepcopy(sys_datasets)
params = copy.deepcopy(params)
# Check types
assert isinstance(sys_datasets, collections.abc.Sequence)
assert isinstance(params, collections.abc.Sequence)
assert isinstance(fit_directory, str)
# there must not be any ambiguity between fitting the hypersurfaces and
# interpolating them later
msg = "interpolation params must be specified as a dict with ordered keys"
assert isinstance(interpolation_param_spec, collections.OrderedDict), msg
for k, v in interpolation_param_spec.items():
assert set(v.keys()) == {"values", "scales_log"}
assert isinstance(v["values"], collections.abc.Sequence)
# We need to extract the magnitudes from the Quantities to avoid a
# UnitStrippedWarning. For some reason, doing `np.min(v["values"])` messes up
# the data structure inside the values in a way that can cause a crash when we
# try to serialize the values later. Lesson: Stripping units inadvertently can
# have strange, unforeseen consequences.
mags = [x.m for x in v["values"]]
if v["scales_log"] and np.min(mags) <= 0:
raise ValueError("A log-scaling parameter cannot be equal to or less "
"than zero!")
# Check output format and path
assert os.path.isdir(fit_directory), "fit directory does not exist"
# Check formatting of datasets is as expected
all_datasets = [nominal_dataset] + sys_datasets
for dataset in all_datasets:
assert isinstance(dataset, collections.abc.Mapping)
assert "pipeline_cfg" in dataset
assert isinstance(dataset["pipeline_cfg"], (str, collections.abc.Mapping))
assert "sys_params" in dataset
assert isinstance(dataset["sys_params"], collections.abc.Mapping)
dataset["pipeline_cfg"] = serialize_pipeline_cfg(dataset["pipeline_cfg"])
# Check params
assert len(params) >= 1
for p in params:
assert isinstance(p, HypersurfaceParam)
# Report inputs
msg = "Hypersurface fit details :\n"
msg += f" Num params : {len(params)}\n"
msg += f" Num fit coefficients : {sum([p.num_fit_coeffts for p in params])}\n"
msg += f" Num datasets : 1 nominal + {len(sys_datasets)} systematics\n"
msg += f" Nominal values : {nominal_dataset['sys_params']}\n"
msg += "Hypersurface fits are prepared on the following grid:\n"
msg += str(interpolation_param_spec)
logging.info(msg)
# because we require this to be an OrderedDict, there is no ambiguity in the
# construction of the mesh here
param_names = list(interpolation_param_spec.keys())
grid_shape = tuple(len(v["values"]) for v in interpolation_param_spec.values())
# We store all information needed to run a fit in metadata
metadata = collections.OrderedDict(
interpolation_param_spec=interpolation_param_spec,
interpolation_param_names=param_names, # convenience
grid_shape=grid_shape, # convenience
nominal_dataset=nominal_dataset,
sys_datasets=sys_datasets,
hypersurface_params=params,
combine_regex=combine_regex,
log=log,
minimum_mc=minimum_mc,
hypersurface_fit_kw=hypersurface_fit_kw
)
to_json(metadata, os.path.join(fit_directory, "metadata.json"))
# we write on JSON file for each grid point
for job_idx, grid_idx in enumerate(np.ndindex(grid_shape)):
# Although this is technically redundant, we store the parameter values
# explicitly for each grid point.
param_values = {}
for i, n in enumerate(param_names):
param_values[n] = interpolation_param_spec[n]["values"][grid_idx[i]]
gridpoint_data = {
"param_values": param_values,
"hs_fit": None,
"job_idx": job_idx,
"grid_idx": grid_idx,
"fit_successful": False
}
to_json(gridpoint_data, os.path.join(fit_directory,
f"gridpoint_{job_idx:06d}.json.bz2"))
logging.info(f"Grid fit preparation complete! Total number of jobs: {job_idx+1}")
return job_idx+1 # zero-indexing
[docs]
def load_interpolated_hypersurfaces(input_file, expected_binning=None):
'''
Load a set of interpolated hypersurfaces from a file.
Analogously to "load_hypersurfaces", this function returns a
collection with a HypersurfaceInterpolator object for each Map.
Parameters
----------
input_file : str
A JSON input file as produced by fit_hypersurfaces if interpolation params
were given. It has the form::
{
interpolation_param_spec = {
'param1': {"values": [val1_1, val1_2, ...], "scales_log": True/False}
'param2': {"values": [val2_1, val2_2, ...], "scales_log": True/False}
...
'paramN': {"values": [valN_1, valN_2, ...], "scales_log": True/False}
},
'hs_fits': [
<list of dicts where keys are map names such as 'nue_cc' and values
are hypersurface states>
]
}
Returns
-------
collections.OrderedDict
dictionary with a :obj:`HypersurfaceInterpolator` for each map
'''
assert isinstance(input_file, str)
if expected_binning is not None:
assert is_binning(expected_binning)
logging.info(f"Loading interpolated hypersurfaces from file: {input_file}")
# Load the data from the file
input_data = from_file(input_file)
#
# Backwards compatibility handling
#
# For older file formats (for example those used in the oscNext verification sample), some handling is
# reequired to convert the input data format to match the expectation of this function
# Check for missing data
if "interpolation_param_spec" not in input_data :
# Confirm the format of what we did find
assert "interp_params" in input_data
assert "hs_fits" in input_data
assert "kind" in input_data
# The current code only handles linear interpolation
assert input_data["kind"] == "linear", f"Only linear interpolation suppored (input file specifies \'{input_data['kind']}\')"
# Populate the interpolation param spec
input_data["interpolation_param_spec"] = collections.OrderedDict()
for param_def in input_data["interp_params"] :
param_name = param_def["name"]
input_data["interpolation_param_spec"][param_name] = {
"scales_log" : False,
"values" : [ hs_fit_dict["param_values"][param_name] for hs_fit_dict in input_data["hs_fits"] ],
}
# Load the individual HS files to get the HS states (as this code now expects for the contents of `hs_fits`)
for hs_fit_dict in input_data["hs_fits"] :
hypersurfaces = load_hypersurfaces(hs_fit_dict["file"], expected_binning=expected_binning)
hs_fit_dict["hs_fit"] = collections.OrderedDict()
for key, hypersurface in hypersurfaces.items() :
hs_fit_dict["hs_fit"][key] = hypersurface
#
# Load hypersurface interpolator(s)
#
# check the file contents
assert set(['interpolation_param_spec', 'hs_fits']).issubset(
set(input_data.keys())), 'missing keys'
# input_data['hs_fits'] is a list of dicts, each dict contains "param_values"
# and "hs_fit"
map_names = None
logging.info("Reading file complete, generating hypersurfaces...")
for hs_fit_dict in input_data['hs_fits']:
# this might not be the actual Hypersurface, but a dict with the serialized Hypersurface state
hs_state_maps = hs_fit_dict["hs_fit"]
if map_names is None:
map_names = list(hs_state_maps.keys())
else:
assert set(map_names) == set(hs_state_maps.keys()), "inconsistent maps"
# When data is recovered from JSON, the object states are not automatically
# converted to the corresponding objects, so we need to do it manually here
# (unless what we loaded was already a hypersurface instance).
for map_name in map_names:
if isinstance(hs_state_maps[map_name], Hypersurface) :
hs_state_maps[map_name] = hs_state_maps[map_name]
else :
hs_state_maps[map_name] = Hypersurface.from_state(hs_state_maps[map_name])
logging.info(f"Read hypersurface maps: {map_names}")
# Check binning
if expected_binning is not None:
for map_name, hypersurface in hs_state_maps.items():
assert hypersurface.binning.hash == expected_binning.hash, "Binning of loaded hypersurfaces does not match the expected binning"
# Now we have a list of dicts where the map names are on the lower level.
# We need to convert this into a dict of HypersurfaceInterpolator objects.
output = collections.OrderedDict()
for m in map_names:
hs_fits = [{"param_values": fd["param_values"], "hs_fit": fd['hs_fit'][m]} for fd in input_data['hs_fits']]
output[m] = HypersurfaceInterpolator(input_data['interpolation_param_spec'], hs_fits)
return output