"""
Class to define binning in one dimension (OneDimBinning) and then a container
class (MultiDimBinning) for arbitrarily many of dimensions (one or more). These
classes have many useful methods for working with binning.
"""
# TODO: include Iterables where only Sequence is allowed now?
# TODO: iterbins, itercoords are _*slow*_. Figure out how to speed these up, if
# that is possible in pure-Python loops... E.g.
# `indices = [i for i in range(mdb.size)]`
# takes 70 ms while
# `coords = [c for c in mdb.itercoords()]`
# takes 10 seconds.
# TODO: Create non-validated version of OneDimBinning.__init__ to make
# iterbins() fast
# TODO: explicitly set is_bin_spacing_log_uniform and
# is_bin_spacing_lin_uniform to FP32 precision (since binning can be
# defined/saved in FP32 but want code able to run in FP64
from __future__ import absolute_import, division
from collections.abc import Iterable, Mapping, Sequence
from collections import OrderedDict, namedtuple
from copy import copy, deepcopy
from functools import reduce, wraps
from itertools import chain, product
from operator import mul
import re
import numpy as np
from pisa import FTYPE, HASH_SIGFIGS, ureg
from pisa.utils.comparisons import interpret_quantity, normQuant, recursiveEquality
from pisa.utils.comparisons import ALLCLOSE_KW
from pisa.utils.format import (make_valid_python_name, text2tex,
strip_outer_dollars)
from pisa.utils.hash import hash_obj
from pisa.utils import jsons
from pisa.utils.log import logging, set_verbosity, tprofile
__all__ = ['NAME_FIXES', 'NAME_SEPCHARS', 'NAME_FIXES_REGEXES',
'basename', '_new_obj', 'is_binning',
'OneDimBinning', 'MultiDimBinning',
'test_OneDimBinning', 'test_MultiDimBinning']
__author__ = 'J.L. Lanfranchi'
__license__ = '''Copyright (c) 2014-2017, The IceCube Collaboration
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.'''
NAME_FIXES = ('tru(e|th)', 'reco(nstruct(ed)?)?')
NAME_SEPCHARS = r'[_\s-]+'
NAME_FIXES_REGEXES = tuple(
[re.compile(NAME_SEPCHARS + p, re.IGNORECASE) for p in NAME_FIXES]
+ [re.compile(p + NAME_SEPCHARS, re.IGNORECASE) for p in NAME_FIXES]
+ [re.compile(p, re.IGNORECASE) for p in NAME_FIXES]
)
# TODO: move this to a centralized utils location
[docs]
def basename(n):
"""Remove "true" or "reco" prefix(es) and/or suffix(es) from binning
name `n` along with any number of possible separator characters.
* Valid (pre/suf)fix(es): "true", "reco"
* Valid separator characters: "<whitespace>", "_", "-" (any number)
Parameters
----------
n : string or OneDimBinning
Name from which to have pre/suffixes stripped.
Returns
-------
basename : string
Examples
--------
>>> print(basename('true_energy'))
energy
>>> print(basename('Reconstructed coszen'))
coszen
>>> print(basename('coszen reco'))
coszen
>>> print(basename('energy___truth'))
energy
>>> print(basename('trueenergy'))
energy
>>> print(basename('energytruth'))
energy
"""
# Type checkingn and conversion
orig_type = type(n)
if isinstance(n, OneDimBinning):
n = n.name
if not isinstance(n, str):
raise ValueError('Unhandled type %s' %orig_type)
# Remove all (pre/suf)fixes and any separator chars
for regex in NAME_FIXES_REGEXES:
n = regex.sub('', n)
return n.strip()
[docs]
def is_binning(something):
"""Return True if argument is a PISA binning (of any dimension), False
otherwise"""
return isinstance(something, (OneDimBinning, MultiDimBinning))
# TODO: generalize to any object and move this to a centralized utils location
def _new_obj(original_function):
"""Decorator to deepcopy unaltered states into new OneDimBinning object."""
@wraps(original_function)
def new_function(cls, *args, **kwargs):
"""<< docstring will be inherited from wrapped function >>"""
new_state = OrderedDict()
state_updates = original_function(cls, *args, **kwargs)
for attr in cls._attrs_to_create_new: # pylint: disable=protected-access
if attr in state_updates:
new_state[attr] = state_updates[attr]
else:
new_state[attr] = deepcopy(getattr(cls, attr))
return OneDimBinning(**new_state)
return new_function
[docs]
class OneDimBinning(object):
# pylint: disable=line-too-long
"""Histogram-oriented binning specialized to a single dimension.
If neither `is_lin` nor `is_log` is specified, linear behavior
is assumed (i.e., `is_lin` is set to True).
Parameters
----------
name : str, of length > 0
Name for this dimension. Must be valid Python name (since it will be
accessed with the dot operator). If not, name will be converted to a
valid Python name.
tex : str or None
TeX label for this dimension.
bin_edges : sequence of scalars, or None
Numerical values (optionally including Pint units) that represent the
*edges* of the bins. `bin_edges` needn't be specified if `domain`,
`num_bins`, and optionally `is_log` is specified. Pint units can be
attached to `bin_edges`, but will be converted to `units` if this
argument is specified.
units : Pint unit or object convertible to Pint unit, or None
If None, units will be read from either `bin_edges` or `domain`, and if
none of these have units, the binning has unit 'dimensionless'
attached.
is_lin : bool or None
Binning behavior is linear for purposes of resampling, plotting, etc.
Mutually exclusive with `is_log`. If neither `is_lin` or `is_log` is
True (i.e., both are None), default behavior is linear (`is_lin` is set
to True internally).
is_log : bool or None
Binning behavior is logarithmic for purposes of resampling, plotting,
etc. Mutually exclusive with `is_lin`. If neither `is_lin` or `is_log`
is True (i.e., both are None), default behavior is linear (`is_lin` is
set to True internally).
domain : length-2 sequence of scalars, or None
Units may be specified. Required along with `num_bins` if `bin_edges`
is not specified (optionally specify `is_log=True` to define the
`bin_edges` to be log-uniform).
num_bins : int or None
Number of bins. Required along with `domain` if `bin_edges` is not
specified (optionally specify `is_log=True` to define the `bin_edges`
to be log-uniform).
bin_names : sequence of nonzero-length strings, or None
Strings by which each bin can be identified. This is expected to be
useful when one needs to easily identify bins by name where the actual
numerical values can be non-obvious e.g. the PID dimension.
None is also acceptable if there is no reason to name the bins.
Notes
-----
Consistency is enforced for all redundant parameters passed to the
constructor.
You can avoid passing `bin_edges` if `num_bins` and `domain` are specified.
Specify `is_lin=True` or `is_log=True` to define the binning to be linear
or logarithmic (but note that if neither is specified as True, linear
behavior is the default).
Be careful, though, since bin edges will be defined slightly differently
depending on the ``pisa.FTYPE`` defined (PISA_FTYPE environment variable).
Examples
--------
>>> from pisa import ureg
>>> from pisa.core.binning import OneDimBinning
>>> ebins = OneDimBinning(name='energy', is_log=True,
... num_bins=40, domain=[1, 100]*ureg.GeV)
>>> print(ebins)
OneDimBinning('energy', 40 logarithmically-regular bins spanning [1.0, 100.0] GeV (behavior is logarithmic))
>>> ebins2 = ebins.to('joule')
>>> print(ebins2)
OneDimBinning('energy', 40 logarithmically-regular bins spanning [1.6021766339999998e-10, 1.602176634e-08] J (behavior is logarithmic))
>>> czbins = OneDimBinning(name='coszen',
... is_lin=True, num_bins=4, domain=[-1, 0])
>>> print(czbins)
OneDimBinning('coszen', 4 linearly-regular bins spanning [-1.0, 0.0] (behavior is linear))
>>> czbins2 = OneDimBinning(name='coszen',
... bin_edges=[-1, -0.75, -0.5, -0.25, 0])
>>> czbins == czbins2
True
"""
# pylint: enable=line-too-long
# NOTE: Only one of `is_log` or `is_lin` is technically required for state,
# since it's either one or the other, but these used to imply
# log-_uniform_ and linear-_uniform_ behavior. As nothing fundamnetally
# in the behavior is changed by assuming uniformity, and to keep
# backwards compatibility (including for state / hashes), both are kept
# (for now) as "state" variables. -JLL, April, 2020
_attrs_to_create_new = ('name', 'tex', 'bin_edges', 'is_log', 'is_lin', 'bin_names')
def __init__(self, name, tex=None, bin_edges=None, units=None, domain=None,
num_bins=None, is_lin=None, is_log=None, bin_names=None):
# Basic validation and translation of args; note that iterables are
# converted to sequences later on
if not isinstance(name, str):
raise TypeError('`name` must be a string; got "%s".' %type(name))
if domain is not None:
assert (
isinstance(domain, Iterable)
or (isinstance(domain, ureg.Quantity) and domain.size > 1)
)
if bin_names is not None:
if isinstance(bin_names, str):
bin_names = (bin_names,)
if (isinstance(bin_names, Iterable)
and all(isinstance(n, str) and n
for n in bin_names)):
bin_names = tuple(bin_names)
else:
raise ValueError(
'`bin_names` must either be None or an iterable of'
' nonzero-length strings.'
)
if bin_edges is not None:
assert (
isinstance(bin_edges, Iterable)
or (isinstance(bin_edges, ureg.Quantity) and bin_edges.size > 1)
)
if domain is not None:
raise ValueError(
'Both `domain` and `bin_edges` are specified.'
)
# Type checking
assert is_lin is None or isinstance(is_lin, bool), str(type(is_lin))
assert is_log is None or isinstance(is_log, bool), str(type(is_log))
if is_lin is None and is_log is None: # neither specified: default to linear
is_lin = True
is_log = not is_lin
elif is_lin is not None: # is_lin is specified but not is_log
is_log = not is_lin
elif is_log is not None: # is_log is specified but not is_lin
is_lin = not is_log
else: # both specified: check consistency
if is_log == is_lin:
raise ValueError(
'`is_log=%s` contradicts `is_lin=%s`' % (is_log, is_lin)
)
self._normalize_values = True
self._name = make_valid_python_name(name)
if self._name != name:
logging.warning('Converted `name` "%s" to valid Python: "%s"',
name, self._name)
self._tex = tex
self._basename = None
self._bin_names = bin_names
self._hashable_state = None
self._serializable_state = None
self._normalized_state = None
self._midpoints = None
self._weighted_centers = None
self._edge_magnitudes = None
self._bin_widths = None
self._weighted_bin_widths = None
self._inbounds_criteria = None
# TODO: define hash based upon conversion of things to base units (such
# that a valid comparison can be made between indentical binnings but
# that use different units). Be careful to round to just less than
# double-precision limits after conversion so that hashes will work out
# to be the same after conversion to the base units.
self._hash = None
self._edges_hash = None
# Figure out the units (if any) for each quantity passed in. Precedence
# for units is:
# 1. `units`
# 2. `bin_edges`
# 3. `domain`
# 4. default to units of `ureg.dimensionless`
if units is not None:
if isinstance(units, ureg.Quantity):
units = units.units
elif not isinstance(units, ureg.Unit):
units = ureg.Unit(units)
units_dimensionality = units.dimensionality
dimensionless_bin_edges = None
if bin_edges is not None:
if isinstance(bin_edges, ureg.Quantity):
be_units = bin_edges.units
if units is None:
units = be_units
else:
if be_units.dimensionality != units_dimensionality:
raise ValueError(
'`bin_edges` units %s are incompatible with units'
' %s.' % (be_units, units)
)
if be_units != units:
logging.warning(
'`bin_edges` are specified in units of %s'
' but `units` is specified as %s.'
' Converting `bin_edges` to the latter.',
be_units, units
)
bin_edges.ito(units)
dimensionless_bin_edges = bin_edges.magnitude
elif bin_edges is not None:
dimensionless_bin_edges = np.asarray(bin_edges, dtype=FTYPE)
bin_edges = None
dimensionless_domain = None
if domain is not None:
if isinstance(domain, ureg.Quantity):
domain_units = domain.units
if units is None:
units = domain_units
else:
if domain_units.dimensionality != units_dimensionality:
raise ValueError(
'`domain` units %s are incmompatible with units'
' %s.' % (domain_units, units)
)
if domain_units != units:
logging.warning(
'`domain` units %s will be converted to' ' %s.',
domain_units,
units,
)
domain.ito(units)
dimensionless_domain = domain.magnitude
else:
domain_lower_is_quant = isinstance(domain[0], ureg.Quantity)
domain_upper_is_quant = isinstance(domain[1], ureg.Quantity)
assert domain_lower_is_quant == domain_upper_is_quant
if domain_lower_is_quant:
assert domain[0].dimensionality == domain[1].dimensionality
if domain[1].units != domain[0].units:
domain[1] = domain[1].to(domain[0].units)
dimensionless_domain = (domain[0].magnitude,
domain[1].magnitude)
else:
dimensionless_domain = tuple(domain)
domain = None
# If no units have been discovered from the input args, assign default
# units
if units is None:
units = ureg.dimensionless
if dimensionless_bin_edges is None:
if num_bins is None or dimensionless_domain is None:
raise ValueError(
'If not specifying bin edges explicitly, `domain` and'
' `num_bins` must be specified (and optionally set'
' `is_log=True`).'
)
if is_log:
dimensionless_bin_edges = np.logspace(
np.log10(dimensionless_domain[0]),
np.log10(dimensionless_domain[1]),
num_bins + 1,
dtype=FTYPE,
)
else: # is_lin
dimensionless_bin_edges = np.linspace(
dimensionless_domain[0],
dimensionless_domain[1],
num_bins + 1,
dtype=FTYPE,
)
elif dimensionless_domain is not None:
assert dimensionless_domain[0] == dimensionless_bin_edges[0]
assert dimensionless_domain[1] == dimensionless_bin_edges[-1]
# TODO: should we warn a user if logarithmically- or linearly-uniform
# bin_edges are passed while is_log, is_lin, or the default (is_lin)
# "contradict" this?
# if not (is_lin or is_log): # infer is_log/is_lin from spacing if not set
# is_lin = self.is_bin_spacing_lin_uniform(dimensionless_bin_edges)
# try:
# is_log = self.is_bin_spacing_log_uniform(dimensionless_bin_edges)
# except ValueError:
# is_log = False
if dimensionless_domain is None:
dimensionless_domain = (dimensionless_bin_edges[0],
dimensionless_bin_edges[-1])
if bin_edges is None:
self._bin_edges = dimensionless_bin_edges * units
else:
self._bin_edges = bin_edges
if domain is None:
self._domain = dimensionless_domain * units
else:
self._domain = domain
self._units = units
# Derive rest of unspecified parameters from bin_edges or enforce
# them if they were specified as arguments to init
if num_bins is None:
num_bins = len(self.bin_edges) - 1
else:
assert num_bins == len(self.bin_edges) - 1, \
'%s, %s' %(num_bins, self.bin_edges)
self._num_bins = num_bins
if (self._bin_names is not None
and len(self._bin_names) != self._num_bins):
raise ValueError(
'There are %d bins, so there must be %d `bin_names` (or None)'
' provided; got %d bin name instead: %s.'
% (self._num_bins, self._num_bins, len(self._bin_names),
self._bin_names)
)
self._is_log = is_log
self._is_lin = not self._is_log
self._is_irregular = None
def __repr__(self):
previous_precision = np.get_printoptions()['precision']
np.set_printoptions(precision=18)
try:
argstrs = [('%s=%r' %item) for item in
self.serializable_state.items()]
r = '%s(%s)' %(self.__class__.__name__, ',\n '.join(argstrs))
finally:
np.set_printoptions(precision=previous_precision)
return r
def __str__(self):
lin_reg = "linearly-regular "
log_reg = "logarithmically-regular "
regularity = None
if self.is_irregular:
# Test for regularity in the other domain
if self.is_lin and self.is_bin_spacing_log_uniform(self.bin_edges):
regularity = log_reg
elif self.is_log and self.is_bin_spacing_lin_uniform(self.bin_edges):
regularity = lin_reg
else:
regularity = log_reg if self.is_log else lin_reg
if regularity is None or self.num_bins == 1:
edges = 'with edges at [{}]{}'.format(
', '.join(str(e) for e in self.bin_edges.m),
format(self.bin_edges.u, '~'),
).strip()
regularity = ""
else:
edges = 'spanning [{}, {}] {:s}'.format(
self.bin_edges[0].magnitude,
self.bin_edges[-1].magnitude,
format(self.units, '~'),
).strip()
plural = '' if self.num_bins == 1 else 's'
linlog = 'logarithmic' if self.is_log else 'linear'
if self.bin_names is None:
bnames = ""
else:
bnames = ', bin_names=[{}]'.format(
', '.join(f"'{n:s}'" for n in self.bin_names)
)
descr = (
f'{self.num_bins:d} {regularity:s}bin{plural:s} {edges:s}'
f' (behavior is {linlog:s}){bnames:s}'
)
return f"{self.__class__.__name__:s}('{self.name:s}', {descr:s})"
def __pretty__(self, p, cycle):
"""Method used by the `pretty` library for formatting"""
if cycle:
p.text('%s(...)' % self.__class__.__name__)
else:
p.begin_group(4, '%s' % self)
p.end_group(4, ')')
def _repr_pretty_(self, p, cycle):
"""Method used by e.g. ipython/Jupyter for formatting"""
return self.__pretty__(p, cycle)
def __getstate__(self):
"""Method invoked during pickling"""
return self.serializable_state
def __setstate__(self, state):
"""Method invoked during unpickling"""
self.__init__(**state)
[docs]
def to_json(self, filename, **kwargs):
"""Serialize the state to a JSON file that can be instantiated as a new
object later.
Parameters
----------
filename : str
Filename; must be either a relative or absolute path (*not
interpreted as a PISA resource specification*)
**kwargs
Further keyword args are sent to `pisa.utils.jsons.to_json()`
See Also
--------
from_json : Instantiate new OneDimBinning object from the file written
by this method
"""
jsons.to_json(self.serializable_state, filename=filename, **kwargs)
[docs]
@classmethod
def from_json(cls, resource):
"""Instantiate a new object from the contents of a JSON file as
formatted by the `to_json` method.
Parameters
----------
resource : str
A PISA resource specification (see pisa.utils.resources)
See Also
--------
to_json
"""
state = jsons.from_json(resource)
return cls(**state)
def __contains__(self, x):
try:
self.index(x)
except ValueError:
return False
return True
[docs]
def index(self, x):
"""Return integer index of bin identified by `x`.
Parameters
----------
x : int, string
If int, ensure it is a valid index and return; if string, look for
bin with corresponding name.
Returns
-------
idx: int
index of bin corresponding to `x`
Raises
------
ValueError if `x` cannot identify a valid bin
"""
try:
if isinstance(x, str):
assert self.bin_names is not None
return self.bin_names.index(x)
if isinstance(x, int):
assert 0 <= x < len(self)
return x
raise TypeError('`x` must be either int or string; got %s instead.'
% type(x))
except (AssertionError, ValueError):
valid_range = [0, len(self)-1]
if self.bin_names is None:
valid_names = ''
else:
valid_names = ' or a valid bin name in %s' % (self.bin_names,)
raise ValueError('Bin corresponding to "%s" could not be located.'
' Specify an int in %s%s.'
% (x, valid_range, valid_names))
[docs]
def iterbins(self):
"""Return an iterator over each bin. The elments returned by the
iterator are each a OneDimBinning object, just containing a single bin.
Note that for one test, `iterbins` is about 500x slower than
`iteredgetuples`.
Returns
-------
bin_iterator
See Also
--------
iteredgetuples
Faster but only returns edges of bins, not OneDimBinning objects.
"""
return (self[i] for i in range(len(self)))
[docs]
def iteredgetuples(self):
"""Return an iterator over each bin's edges. The elments returned by
the iterator are each a tuple, containing the edges of the bin. Units
are stripped prior to iteration for purposes of speed.
Returns
-------
edges_iterator
See Also
--------
iterbins
Similar, but returns a OneDimBinning object for each bin; slower
than this method (by as much as 500x in one test) but easier to
work with.
"""
mags = self.edge_magnitudes
return ((e0, e1) for e0, e1 in zip(mags[:-1], mags[1:]))
@property
def serializable_state(self):
"""OrderedDict containing savable state attributes"""
if self._serializable_state is None:
state = OrderedDict()
state['name'] = self.name
state['bin_edges'] = self.edge_magnitudes
state['units'] = str(self.units)
state['is_log'] = self.is_log
state['is_lin'] = self.is_lin
state['bin_names'] = self.bin_names
self._serializable_state = state
# Since the tex property can be modified, must set every time this
# property is called
self._serializable_state['tex'] = self.tex
return self._serializable_state
@property
def hashable_state(self):
"""OrderedDict containing simplified state attributes (i.e. some state
attributes are represented by their hashes) used for testing equality
between two objects.
Use `hashable_state` for faster equality checks and `normalized_state`
for inspecting the contents of each state attribute pre-hashing
"""
if self._hashable_state is None:
state = OrderedDict()
state['name'] = self.name
state['edges_hash'] = self.edges_hash
state['is_log'] = self.is_log
state['is_lin'] = self.is_lin
state['bin_names'] = self.bin_names
self._hashable_state = state
return self._hashable_state
@property
def normalized_state(self):
"""OrderedDict containing normalized (base units, and rounded to
appropriate precision) state attributes used for testing equality
between two objects.
Use `hashable_state` for faster equality checks and `normalized_state`
for inspecting the contents of each state attribute pre-hashing
"""
if self._normalized_state is None:
state = OrderedDict()
state['name'] = self.name
bin_edges = normQuant(self.bin_edges, sigfigs=HASH_SIGFIGS)
state['bin_edges'] = bin_edges
state['is_log'] = self.is_log
state['is_lin'] = self.is_lin
state['bin_names'] = self.bin_names
self._normalized_state = state
return self._normalized_state
@property
def edge_magnitudes(self):
"""Bin edges' magnitudes"""
if self._edge_magnitudes is None:
self._edge_magnitudes = self.bin_edges.magnitude
return self._edge_magnitudes
@property
def name(self):
"""Name of the dimension"""
return self._name
@property
def basename(self):
"""Basename of the dimension, stripping "true", "reco", underscores,
whitespace, etc. from the `name` attribute."""
if self._basename is None:
self._basename = basename(self.name)
return self._basename
# TODO: reimplement just the translate-on-input (or not?), but is this a
# performance hit for e.g. iterbins()? That could argue for
# translate-on-output...
@property
def tex(self):
"""string : TeX label"""
if self._tex is None:
return text2tex(self.name)
return self._tex
@tex.setter
def tex(self, val):
"""None or TeX string for dimension; surrounding dollars-signs ($) are
stripped off (and must be added prior to e.g. plotting)"""
assert val is None or isinstance(val, str)
if val is not None:
val = strip_outer_dollars(val)
self._tex = val
@property
def label(self):
"""TeX-formatted axis label, including units (if not dimensionless)"""
if self.tex is None:
name_tex = r'{\rm %s}' % text2tex(self.name)
else:
name_tex = self.tex
if self.units == ureg.dimensionless:
units_tex = ''
else:
units_tex = r' \; \left( {:~L} \right)'.format(self.units)
return name_tex + units_tex
@property
def shape(self):
"""tuple : shape of binning, akin to `nump.ndarray.shape`"""
return (self.num_bins,)
@property
def size(self):
"""int : total number of bins"""
return self.num_bins
@property
def bin_edges(self):
"""array : Edges of the bins."""
return self._bin_edges
@property
def bin_names(self):
"""list of strings or None : Bin names"""
return self._bin_names
@property
def domain(self):
"""array : domain of the binning, (min, max) bin edges"""
if self._domain is None:
bin_edges = self.edge_magnitudes
self._domain = np.array([np.min(bin_edges),
np.max(bin_edges)]) * self.units
return self._domain
@property
def range(self):
"""float : range of the binning, (max-min) bin edges"""
domain = self.domain
return domain[1] - domain[0]
@property
def units(self):
"""pint.Unit : units of the bins' edges"""
return self._units
@units.setter
def units(self, u):
"""str or pint.Unit : units of the bins' edges"""
self.ito(u)
@property
def num_bins(self):
"""int : Number of bins"""
return self._num_bins
@property
def is_lin(self):
"""bool : Whether binning is to be treated in a linear space"""
return self._is_lin
@is_lin.setter
def is_lin(self, b):
"""bool"""
assert isinstance(b, bool)
if b != self._is_lin:
# NOTE: use tuple unpacking to help ensure state is consistent
(
self._is_lin,
self._is_log,
self._is_irregular,
self._weighted_centers,
self._weighted_bin_widths,
) = (b, not b, None, None, None)
@property
def is_log(self):
"""bool : Whether binning is to be treated in a log space"""
return self._is_log
@is_log.setter
def is_log(self, b):
"""bool"""
assert isinstance(b, bool)
if b != self._is_log:
# NOTE: use tuple unpacking to help ensure state is consistent
(
self._is_log,
self._is_lin,
self._is_irregular,
self._weighted_centers,
self._weighted_bin_widths,
) = (b, not b, None, None, None)
@property
def is_irregular(self):
"""bool : True if bin spacing is not unform in the space defined (i.e.,
NOT linearly-uniform if `is_lin` or NOT logarithmically-uniform if
`is_log`)."""
if self._is_irregular is None:
if self.num_bins == 1:
self._is_irregular = False
elif self.is_log:
self._is_irregular = not self.is_bin_spacing_log_uniform(self.bin_edges)
else: # self.is_lin
self._is_irregular = not self.is_bin_spacing_lin_uniform(self.bin_edges)
return self._is_irregular
@property
def midpoints(self):
"""array : Midpoints of the bins: linear average of each bin's
edges."""
if self._midpoints is None:
self._midpoints = (self.bin_edges[:-1] + self.bin_edges[1:])/2.0
return self._midpoints
@property
def weighted_centers(self):
"""array : Centers of the bins taking e.g. logarithmic behavior
into account. I.e., if binning is logarithmic, this is **not**
the same `midpoints`, whereas in all other cases, it is identical."""
if self._weighted_centers is None:
if self.is_log:
self._weighted_centers = np.sqrt(self.bin_edges[:-1] *
self.bin_edges[1:])
else:
self._weighted_centers = self.midpoints
return self._weighted_centers
@property
def hash(self):
"""int : Hash value based upon less-than-double-precision-rounded
numerical values and any other state (includes name, tex, is_log, and
is_lin attributes). Rounding is done to `HASH_SIGFIGS` significant
figures.
Set this class attribute to None to keep full numerical precision in
the values hashed (but be aware that this can cause equal things
defined using different unit orders-of-magnitude to hash differently).
"""
if self._hash is None:
s = self.hashable_state
self._hash = hash_obj(s)
return self._hash
[docs]
def rehash(self):
"""Force `hash` and `edges_hash` attributes to be recomputed"""
self._hash = None
self._edges_hash = None
_ = self.hash
_ = self.edges_hash
def __hash__(self):
return self.hash
@property
def normalize_values(self):
"""bool : Whether to normalize quantities' units prior to hashing"""
return self._normalize_values
@normalize_values.setter
def normalize_values(self, b):
assert isinstance(b, bool)
if b == self._normalize_values:
return
# Invalidate the hash, since the hashing behavior has changed
self._hash = None
self._edges_hash = None
self._normalize_values = b
@property
def edges_hash(self):
"""Hash value based *solely* upon bin edges' values.
The hash value is obtained on the edges after "normalizing" their
values if `self.normalize_values` is True; see
`pisa.utils.comparsions.normQuant` for details of the normalization
process.
"""
if self._edges_hash is None:
if self.normalize_values:
bin_edges = normQuant(self.bin_edges, sigfigs=HASH_SIGFIGS)
else:
bin_edges = self.bin_edges
self._edges_hash = hash_obj(bin_edges)
return self._edges_hash
@property
def bin_widths(self):
"""Absolute widths of bins."""
if self._bin_widths is None:
self._bin_widths = np.abs(np.diff(self.bin_edges.m)) * self.units
return self._bin_widths
@property
def weighted_bin_widths(self):
"""Absolute widths of bins."""
if self._weighted_bin_widths is None:
if self.is_log:
self._weighted_bin_widths = (
np.log(self.edge_magnitudes[1:] / self.edge_magnitudes[:-1])
) * ureg.dimensionless
else:
self._weighted_bin_widths = self.bin_widths
return self._weighted_bin_widths
@property
def inbounds_criteria(self):
"""Return string boolean criteria indicating e.g. an event falls within
the limits of the defined binning.
This can be used for e.g. applying cuts to events.
See Also
--------
pisa.core.events.keepInbounds
"""
if self._inbounds_criteria is None:
be = self.edge_magnitudes
crit = '(%s >= %.15e) & (%s <= %.15e)' % (self.name, min(be),
self.name, max(be))
self._inbounds_criteria = crit
return self._inbounds_criteria
def __len__(self):
"""Number of bins (*not* number of bin edges)."""
return self.num_bins
def __mul__(self, other):
if isinstance(other, OneDimBinning):
return MultiDimBinning([self, other])
if isinstance(other, MultiDimBinning):
return MultiDimBinning(chain([self], other))
return OneDimBinning(name=self.name, tex=self.tex,
bin_edges=self.bin_edges * other)
# TODO: if same or contained dimension, modify the current binning OR
# create a smarter MultiDimBinning object that allows for multiple
# disconnected binning regions with arbitrary binning within each
# region
def __add__(self, other):
if isinstance(other, OneDimBinning):
return MultiDimBinning([self, other])
if isinstance(other, MultiDimBinning):
return MultiDimBinning(chain([self], other))
other = interpret_quantity(other, expect_sequence=True)
new_bin_edges = self.bin_edges + other
return OneDimBinning(name=self.name, tex=self.tex, bin_edges=new_bin_edges)
@_new_obj
def __deepcopy__(self, memo):
"""Explicit deepcopy constructor"""
return {}
[docs]
@staticmethod
def is_binning_ok(bin_edges):
"""Check that there are 2 or more bin edges, and that they are
monotonically increasing.
Parameters
----------
bin_edges : sequence
Bin edges to check the validity of
Returns
-------
bool, True if binning is OK, False if not
"""
# Must be at least two edges to define a single bin
if len(bin_edges) < 2:
return False
# Bin edges must be monotonic and strictly increasing
if np.any(np.diff(bin_edges) <= 0):
return False
return True
# TODO: as of now, only downsampling is allowed. Is this reasonable?
[docs]
def is_compat(self, other):
"""Compatibility -- for now -- is defined by all of self's bin
edges form a subset of other's bin edges (i.e. you can downsample to
get from the other binning to this binning), and the units must be
compatible.
Note that this might bear revisiting, or redefining just for special
circumstances.
Parameters
----------
other : OneDimBinning
Returns
-------
bool
"""
if self.name != other.name:
logging.trace('Dimension names do not match')
return False
if self.units.dimensionality != other.units.dimensionality:
logging.trace('Incompatible units')
return False
# TODO: should we force normalization?
# TODO: Should we use FTYPE_SIGFIGS or # HASH_SIGFIGS?
if self.normalize_values:
my_normed_bin_edges = set(
normQuant(self.bin_edges, sigfigs=HASH_SIGFIGS).magnitude
)
other_normed_bin_edges = set(
normQuant(other.bin_edges, sigfigs=HASH_SIGFIGS).magnitude
)
else:
my_normed_bin_edges = set(self.bin_edges.magnitude)
other_normed_bin_edges = set(other.bin_edges.magnitude)
if my_normed_bin_edges.issubset(other_normed_bin_edges):
return True
logging.trace('self.bin_edges not a subset of other.bin_edges')
logging.trace('Bins in this map not found in other = %s',
my_normed_bin_edges.difference(other_normed_bin_edges))
return False
[docs]
def assert_compat(self, other):
"""Assert that this binning is compatible with `other`."""
if not self.is_compat(other):
raise AssertionError(f"incompatible {self.name} binning")
@property
@_new_obj
def basename_binning(self):
"""Identical binning but named as the basename of this binning. Note
that the `tex` property is not carried over into the new binning."""
return {'name': self.basename, 'tex': None}
@property
@_new_obj
def finite_binning(self):
"""Identical binning but with infinities in bin edges replaced by
largest/smallest floating-point numbers representable with the current
pisa.FTYPE."""
float_info = np.finfo(FTYPE)
finite_edges = np.clip(self.edge_magnitudes, a_min=float_info.min,
a_max=float_info.max)
return {'bin_edges': finite_edges}
[docs]
@_new_obj
def oversample(self, factor):
"""Return a OneDimBinning object oversampled relative to this object's
binning.
Parameters
----------
factor : integer
Factor by which to oversample the binning, with `factor`-times
as many bins (*not* bin edges) as this object has.
Returns
-------
new_binning : OneDimBinning
New binning, oversampled from the current binning.
Raises
------
ValueError if illegal value is specified for `factor`
Notes
-----
Bin names are _not_ preserved for any `factor` except 1 since it is
ambiguous how names should be propagated. If you wish to have bin
names after oversampling, assign them afterwards.
"""
if factor < 1 or factor != int(factor):
raise ValueError('`factor` must be integer >= 1; got %s' %factor)
factor = int(factor)
if factor == 1:
return self
if self.is_log:
spacing_func = np.geomspace
else: # is_lin
spacing_func = np.linspace
old_bin_edges = self.edge_magnitudes
new_bin_edges = []
for old_lower, old_upper in zip(old_bin_edges[:-1], old_bin_edges[1:]):
thisbin_new_edges = spacing_func(old_lower, old_upper, factor + 1)
# Use the original lower bin edge to avoid precision issues with
# its version created by `spacing_func`
new_bin_edges.append(old_lower)
# Add the new bin edges we created in between lower and upper; we
# omit the upper bin edge because it is the first bin edge of the
# next bin
new_bin_edges.extend(thisbin_new_edges[1:-1])
# Include the uppermost bin edge from original binning
new_bin_edges.append(old_upper)
return {'bin_edges': new_bin_edges * self.units, 'bin_names': None}
# TODO: do something cute with bin names, if they exist?
[docs]
@_new_obj
def downsample(self, factor):
"""Downsample the binning by an integer factor that evenly divides the
current number of bins.
Parameters
----------
factor : int >= 1
Downsampling factor that evenly divides the current number of
bins. E.g., if the current number of bins is 4, `factor` can be
one of 1, 2, or 4. Note that floats are converted into integers
if `float(factor) == int(factor)`.
Returns
-------
new_binning : OneDimBinning
New binning, downsampled from the current binning.
Raises
------
ValueError if illegal value is specified for `factor`
Notes
-----
Bin names are _not_ preserved for any `factor` except 1 since it is
ambiguous how names should be propagated. If you wish to have bin
names after downsampling, assign them afterwards.
"""
if int(factor) != float(factor):
raise ValueError('Floating point `factor` is non-integral.')
factor = int(factor)
if factor == 1:
return self
if factor < 1 or factor > self.num_bins:
raise ValueError(
'`factor` %d is out of range; must be >= 1 and <= number of'
' bins (%d).' % (factor, self.num_bins)
)
if self.num_bins % factor != 0:
raise ValueError(
'`factor` %d does not evenly divide number of bins (%d).'
% (factor, self.num_bins)
)
return {'bin_edges': self.bin_edges[::factor],
'bin_names': None}
[docs]
def ito(self, units):
"""Convert units in-place. Cf. Pint's `ito` method."""
if units is None:
units = ''
units = ureg.Unit(units)
if units == self._units:
return
self._units = units
# Invalidate (expensive) derived properties that rely on units
for attr in ['_inbounds_criteria']:
setattr(self, attr, None)
# Convert already-defined quantities
attrs = [
'_bin_edges',
'_domain',
'_midpoints',
'_weighted_centers',
'_bin_widths',
'_edge_magnitudes',
]
for attr in attrs:
val = getattr(self, attr)
if val is None:
continue
val.ito(units)
[docs]
@_new_obj
def to(self, units): # pylint: disable=invalid-name
"""Convert bin edges' units to `units`.
Parameters
----------
units : None, string, or pint.Unit
Returns
-------
new_binning : OneDimBinning
New binning object whose edges have units `units`
"""
if units is None:
units = 'dimensionless'
return {'bin_edges': self.bin_edges.to(ureg(str(units)))}
def __getattr__(self, attr):
return super().__getattribute__(attr)
# TODO: make this actually grab the bins specified (and be able to grab
# disparate bins, whether or not they are adjacent)... i.e., fill in all
# upper bin edges, and handle the case that it goes from linear or log
# to uneven (or if it stays lin or log, keep that attribute for the
# subselection). Granted, a OneDimBinning object right now requires
# monotonically-increasing and adjacent bins.
# TODO: make indexing allow for sequence containing a single ellipsis
# TODO: for some reason, this is crazy, crazy slow when indexing with
# ellipsis... why?
# NOTE: mabye we don't care, since using ellipsis (or even an isolated,
# single colon) in a one-dimensional object is a "violation of the
# contract": http://stackoverflow.com/a/118508
@_new_obj
def __getitem__(self, index):
"""Return a new OneDimBinning, sub-selected by `index`.
Parameters
----------
index : int, slice, ellipsis, str, or length-one Sequence
The *bin indices* (not bin-edge indices) to return. Generated
OneDimBinning object must obey the usual rules (monotonic, etc.).
If a str is supplied it must match a name in bin_names
Returns
-------
A new OneDimBinning but only with bins selected by `index`.
"""
# Ellipsis: binninng[...] returns everything
if index is Ellipsis:
return {}
magnitude = self.edge_magnitudes
units = self.units
orig_index = index
mylen = len(magnitude) - 1
bin_names = self.bin_names
# Deal with indexing by name first so as to not break anything else
if isinstance(index, str):
assert bin_names is not None
index = bin_names.index(index)
# Simple to get all but final bin edge
bin_edges = magnitude[index].tolist()
if np.isscalar(bin_edges):
bin_edges = [bin_edges]
else:
bin_edges = list(bin_edges)
# Convert index/indices to positive-number sequence
if isinstance(index, slice):
index = list(range(*index.indices(mylen)))
if isinstance(index, int):
index = [index]
if isinstance(index, Iterable):
if not isinstance(index, Sequence):
index = list(index)
for bin_index in index:
if isinstance(bin_index, str):
raise ValueError('Slicing by seq of names currently not'
' supported')
if not index:
raise ValueError('`index` "%s" results in no bins being'
' specified.' %orig_index)
if len(index) > 1 and not np.all(np.diff(index) == 1):
raise ValueError('Bin indices must be monotonically'
' increasing and adjacent.')
new_edges = set()
new_names = []
for bin_index in index:
if bin_index < -mylen or bin_index >= mylen:
raise ValueError(
"Dimension '%s': bin index %s is invalid. Bin index"
" must be >= %+d and <= %+d"
%(self.name, bin_index, -mylen, mylen-1)
)
edge_ind0 = bin_index % mylen
edge_ind1 = edge_ind0 + 1
if bin_names is not None:
new_names.append(bin_names[edge_ind0])
mag0 = magnitude[edge_ind0]
mag1 = magnitude[edge_ind1]
new_edges = new_edges.union((mag0, mag1))
else:
raise TypeError('Unhandled index type %s' %type(orig_index))
if new_names == []:
new_names = None
# Retrieve current state; only bin_edges and bin_names need to be
# updated
new_edges = sorted(new_edges)
new_edges = np.array(new_edges)
new_edges = new_edges * units
return {'bin_edges': new_edges, 'bin_names': new_names}
def __iter__(self):
return self.iterbins()
def __eq__(self, other):
if not isinstance(other, OneDimBinning):
return False
return recursiveEquality(self.hashable_state, other.hashable_state)
def __ne__(self, other):
return not self.__eq__(other)
[docs]
class MultiDimBinning(object):
# pylint: disable=line-too-long
r"""
Multi-dimensional binning object. This can contain one or more
OneDimBinning objects, and all subsequent operations (e.g. slicing) will
act on these in the order they are supplied.
Note that it is convenient to construct MultiDimBinning objects via the *
operator (which implementes the outer product) from multiple OneDimBinning
objects. See Examples below for details.
Parameters
----------
dimensions : OneDimBinning or sequence convertible thereto
Dimensions for the binning object. Indexing into the MultiDimBinning
object follows the order in which dimensions are provided.
See Also
--------
OneDimBinning : each item that is not a OneDimBinning object is passed to
this class to be instantiated as such.
Examples
--------
>>> from pisa import ureg
>>> from pisa.core.binning import MultiDimBinning, OneDimBinning
>>> ebins = OneDimBinning(name='energy', is_log=True,
... num_bins=40, domain=[1, 100]*ureg.GeV)
>>> czbins = OneDimBinning(name='coszen',
... is_lin=True, num_bins=4, domain=[-1, 0])
>>> mdb = ebins * czbins
>>> print(mdb)
MultiDimBinning(
OneDimBinning('energy', 40 logarithmically-regular bins spanning [1.0, 100.0] GeV (behavior is logarithmic)),
OneDimBinning('coszen', 4 linearly-regular bins spanning [-1.0, 0.0] (behavior is linear))
)
>>> print(mdb.energy)
OneDimBinning('energy', 40 logarithmically-regular bins spanning [1.0, 100.0] GeV (behavior is logarithmic))
>>> print(mdb[0, 0])
MultiDimBinning(
OneDimBinning('energy', 1 logarithmically-regular bin with edges at [1.0, 1.1220184543019633]GeV (behavior is logarithmic)),
OneDimBinning('coszen', 1 linearly-regular bin with edges at [-1.0, -0.75] (behavior is linear))
)
>>> print(mdb.slice(energy=2))
MultiDimBinning(
OneDimBinning('energy', 1 logarithmically-regular bin with edges at [1.2589254117941673, 1.4125375446227544]GeV (behavior is logarithmic)),
OneDimBinning('coszen', 4 linearly-regular bins spanning [-1.0, 0.0] (behavior is linear))
)
>>> smaller_binning = mdb[0:2, 0:3]
>>> map = smaller_binning.ones(name='my_map')
>>> print(map)
Map(name='my_map',
tex='{\\rm my\\_map}',
full_comparison=False,
hash=None,
parent_indexer=None,
binning=MultiDimBinning(
OneDimBinning('energy', 2 logarithmically-regular bins spanning [1.0, 1.2589254117941673] GeV (behavior is logarithmic)),
OneDimBinning('coszen', 3 linearly-regular bins spanning [-1.0, -0.25] (behavior is linear))
),
hist=array([[1., 1., 1.],
[1., 1., 1.]]))
"""
# pylint: enable=line-too-long
def __init__(self, dimensions, name=None, mask=None):
self.__map_class = None
if isinstance(dimensions, OneDimBinning):
dimensions = [dimensions]
if not isinstance(dimensions, Sequence):
if isinstance(dimensions, Mapping):
if len(dimensions) == 1 and hasattr(dimensions, 'dimensions'):
dimensions = dimensions['dimensions']
dimensions = [dimensions]
elif isinstance(dimensions, Iterable):
pass
else:
raise TypeError('`dimensions` unhandled type: %s'
% type(dimensions))
tmp_dimensions = []
for obj_num, obj in enumerate(dimensions):
if isinstance(obj, OneDimBinning):
one_dim_binning = obj
elif isinstance(obj, Mapping):
one_dim_binning = OneDimBinning(**obj)
else:
raise TypeError('Argument/object #%d unhandled type: %s'
%(obj_num, type(obj)))
tmp_dimensions.append(one_dim_binning)
self._dimensions = tuple(tmp_dimensions)
self._names = None
self._basenames = None
self._hash = None
self._num_dims = None
self._size = None
self._shape = None
self._hashable_state = None
self._mask_hash = None
self._coord = None
self._name = name
# Handle masking
self._init_mask(mask)
def _init_mask(self, mask) :
'''
Initialize the bin mask. This can either be specified as:
(1) an array matching the bin dimensions with values True/False (e.g. a `mask`), where False means "masked off"
(2) A list of bin indices to mask off
'''
#TODO helper functions: get coords of masked bins, etc
# Bail out if no mask provided
if mask is None :
self._mask = None
return
# Check format of input `mask` arg
if isinstance(mask, np.ndarray) and (mask.dtype == bool) : # Is it a boolean array (e.g. a mask)?
#
# "Mask" case
#
# Just use the mask as provided
# Do some checks first
assert self.shape == mask.shape
else :
#
# "List of indices" case
#
# Get the indices
indices = mask
# Init a mask with all True
mask = np.full(self.shape, True, dtype=bool)
# Loop over indices and set those mask elements to False
for idx in indices :
try: # Handle index formatting and checks
mask[idx] = False
except ValueError:
raise ValueError(f"Bin mask index {idx} not valid for binning shape {self.shape}")
# Done, store the mask
self._mask = mask
@property
def name(self):
"""Name of the dimension"""
return self._name
def __repr__(self):
previous_precision = np.get_printoptions()['precision']
np.set_printoptions(precision=18)
try:
argstrs = [('%s=%r' %item) for item in
self.serializable_state.items()]
r = '%s(%s)' %(self.__class__.__name__, ',\n '.join(argstrs))
finally:
np.set_printoptions(precision=previous_precision)
return r
def __str__(self):
b = ' x '.join(['%i (%s)'%(dim.num_bins, dim.name) for dim in self._dimensions])
return '"%s":\n%s'%(self.name, b)
def __pretty__(self, p, cycle):
"""Method used by the `pretty` library for formatting"""
if cycle:
p.text('%s(...)' % self.__class__.__name__)
else:
p.begin_group(4, '%s([' % self.__class__.__name__)
for n, dim in enumerate(self):
p.breakable()
p.pretty(dim)
if n < len(self)-1:
p.text(',')
p.end_group(4, '])')
def _repr_pretty_(self, p, cycle):
"""Method used by e.g. ipython/Jupyter for formatting"""
return self.__pretty__(p, cycle)
def __getstate__(self):
"""Method invoked during pickling"""
return self.serializable_state
def __setstate__(self, state):
"""Method invoked during unpickling"""
self.__init__(**state)
@property
def _map_class(self):
if self.__map_class is None:
from pisa.core.map import Map # pylint: disable=wrong-import-position
self.__map_class = Map
return self.__map_class
[docs]
def to_json(self, filename, **kwargs):
"""Serialize the state to a JSON file that can be instantiated as a new
object later.
Parameters
----------
filename : str
Filename; must be either a relative or absolute path (*not
interpreted as a PISA resource specification*)
**kwargs
Further keyword args are sent to `pisa.utils.jsons.to_json()`
See Also
--------
from_json
Instantiate new object from the file written by this method
pisa.utils.jsons.to_json
"""
jsons.to_json(self.serializable_state, filename=filename, **kwargs)
[docs]
@classmethod
def from_json(cls, resource):
"""Instantiate a new MultiDimBinning object from a JSON file.
The format of the JSON is generated by the `MultiDimBinning.to_json`
method, which converts a MultiDimBinning object to basic types and
numpy arrays are converted in a call to `pisa.utils.jsons.to_json`.
Parameters
----------
resource : str
A PISA resource specification (see pisa.utils.resources)
See Also
--------
to_json
pisa.utils.jsons.to_json
"""
state = jsons.from_json(resource)
return cls(**state)
@property
def names(self):
"""list of strings : names of each dimension contained"""
if self._names is None:
self._names = [dim.name for dim in self]
return self._names
@property
def basenames(self):
"""List of binning names with prefixes and/or suffixes along with any
number of possible separator characters removed. See function
`basename` for detailed specifications."""
if self._basenames is None:
self._basenames = [b.basename for b in self]
return self._basenames
@property
def basename_binning(self):
"""Identical binning but with dimensions named by their basenames.
Note that the `tex` properties for the dimensions are not carried over
into the new binning."""
return MultiDimBinning(d.basename_binning for d in self)
@property
def finite_binning(self):
"""Identical binning but with infinities in bin edges replaced by
largest/smallest floating-point numbers representable with the current
pisa.FTYPE."""
return MultiDimBinning(d.finite_binning for d in self)
@property
def dimensions(self):
"""tuple of OneDimBinning : each dimension's binning in a list"""
return self._dimensions
@property
def dims(self):
"""tuple of OneDimBinning : shortcut for `dimensions`"""
return self._dimensions
[docs]
def iterdims(self):
"""Iterator over contained `dimensions`, each a OneDimBinning"""
return iter(self._dimensions)
@property
def num_dims(self):
"""int : number of dimensions"""
if self._num_dims is None:
self._num_dims = len(self._dimensions)
return self._num_dims
@property
def mask(self):
"""array : return the bin mask"""
return self._mask
@property
def shape(self):
"""tuple : shape of binning, akin to `nump.ndarray.shape`"""
if self._shape is None:
self._shape = tuple(b.num_bins for b in self._dimensions)
return self._shape
@property
def size(self):
"""int : total number of bins"""
if self._size is None:
self._size = reduce(mul, self.shape)
return self._size
@property
def coord(self):
"""namedtuple : coordinate for indexing into binning by dim names"""
if self._coord is None:
self._coord = namedtuple('coord', self.names)
return self._coord
@property
def normalize_values(self):
"""bool : Normalize quantities' units prior to hashing"""
nv = [dim.normalize_values for dim in self]
if not all(x == nv[0] for x in nv):
raise ValueError(
'Contained dimensions have `normalize_values` both True and'
' False. Set `normalize_values` to either True or False on'
' this MultiDimBinning object to force consistency among'
' contained OneDimBinning objects.'
)
@normalize_values.setter
def normalize_values(self, b):
for dim in self:
dim.normalize_values = b
@property
def mask_hash(self):
"""Hash value based *solely* upon the mask.
"""
if self._mask_hash is None:
self._mask_hash = hash_obj(self.mask)
return self._mask_hash
@property
def serializable_state(self):
"""Attributes of the object that are stored to disk. Note that
attributes may be returned as references to other objects, so to
prevent external modification of those objects, the user must call
deepcopy() separately on the returned OrderedDict.
Returns
-------
state dict : OrderedDict
can be passed to instantiate a new MultiDimBinning via
`MultiDimBinning(**state)`
"""
d = OrderedDict({'dimensions': [d.serializable_state for d in self]})
d['name'] = self.name
d['mask'] = self.mask
return d
@property
def hashable_state(self):
"""Everything necessary to fully describe this object's state. Note
that objects may be returned by reference, so to prevent external
modification, the user must call deepcopy() separately on the returned
OrderedDict.
Returns
-------
state : OrderedDict that can be passed to instantiate a new
MultiDimBinning via MultiDimBinning(**state)
"""
if self._hashable_state is None:
state = OrderedDict()
# TODO: Shouldn't order matter?
#state['dimensions'] = [self[name]._hashable_state
# for name in sorted(self.names)]
state['dimensions'] = [d.hashable_state for d in self]
state['name'] = self.name
mask_hash = self.mask_hash
if mask_hash is not None :
state['mask_hash'] = mask_hash
self._hashable_state = state
return self._hashable_state
@property
def normalized_state(self):
"""OrderedDict containing normalized (base units, and rounded to
appropriate precision) state attributes used for testing equality
between two objects.
Use `hashable_state` for faster equality checks and `normalized_state`
for inspecting the contents of each state attribute pre-hashing
"""
state = OrderedDict()
state['dimensions'] = [d.normalized_state for d in self]
state['mask'] = self.mask
return state
@property
def hash(self):
"""Unique hash value for this object"""
if self._hash is None:
self._hash = hash_obj(self.hashable_state)
return self._hash
def __hash__(self):
return self.hash
@property
def edges_hash(self):
"""int : hash on the list of hashes for each dimension's edge values"""
return hash_obj([d.edges_hash for d in self])
@property
def bin_edges(self):
"""Return a list of the contained dimensions' bin_edges that is
compatible with the numpy.histogramdd `hist` argument."""
return [d.bin_edges for d in self]
@property
def is_irregular(self):
"""Returns `True` if any of the 1D binnings is irregular."""
return np.any([d.is_irregular for d in self])
@property
def is_lin(self):
"""Returns `True` iff all dimensions are linear."""
return np.all([d.is_lin for d in self])
@property
def is_log(self):
"""Returns `True` iff all dimensions are log."""
return np.all([d.is_log for d in self])
@property
def domains(self):
"""Return a list of the contained dimensions' domains"""
return [d.domain for d in self]
@property
def midpoints(self):
"""Return a list of the contained dimensions' midpoints"""
return [d.midpoints for d in self]
@property
def weighted_centers(self):
"""Return a list of the contained dimensions' weighted_centers (e.g.
equidistant from bin edges on logarithmic scale, if the binning is
logarithmic; otherwise linear). Access `midpoints` attribute for
always-linear alternative."""
return [d.weighted_centers for d in self]
@property
def num_bins(self):
"""
Return a list of the contained dimensions' num_bins.
Note that this does not accpunt for any bin mask (since it is computed per dimension)
"""
return [d.num_bins for d in self]
@property
def tot_num_bins(self):
"""
Return total number of bins.
If a bin mask is used, this will only count bins that are not masked off
"""
if self.mask is None :
return np.product(self.shape)
else :
return np.sum(self.mask.astype(int))
@property
def units(self):
"""list : Return a list of the contained dimensions' units"""
return [d.units for d in self]
@units.setter
def units(self, *args):
"""sequence or *args containing units for each contained dim"""
self.ito(*args[0])
@property
def inbounds_criteria(self):
"""Return string boolean criteria indicating e.g. an event falls within
the limits of the defined binning.
This can be used for e.g. applying cuts to events.
See Also
--------
pisa.core.events.keepEventsInBins
"""
crit = '(%s)' %(' & '.join(dim.inbounds_criteria for dim in self))
return crit
[docs]
def index(self, dim, use_basenames=False):
"""Find dimension implied by `dim` and return its integer index.
Parameters
----------
dim : int, string, OneDimBinning
An integer index, dimesion name, or identical OneDimBinning object
to locate within the contained dimensions
use_basenames : bool
Dimension names are only compared after pre/suffixes are stripped,
allowing for e.g. `dim`='true_energy' to find 'reco_energy'.
Returns
-------
idx : integer
index of the dimension corresponding to `dim`
Raises
------
ValueError if `dim` cannot be found
"""
names = self.basenames if use_basenames else self.names
if isinstance(dim, OneDimBinning):
d = dim.basename if use_basenames else dim.name
try:
idx = names.index(d)
except ValueError:
what = 'index'
raise ValueError(
'Dimension %s not present. Valid dimensions are in range %s'
%(d, [0, len(self)-1])
)
elif isinstance(dim, str):
d = basename(dim) if use_basenames else dim
try:
idx = names.index(d)
except ValueError:
what = 'basename' if use_basenames else 'name'
raise ValueError(
"Dimension %s '%s' not present. Valid dimension %ss are %s"
%(what, d, what, names)
)
elif isinstance(dim, int):
if dim < 0 or dim >= len(self):
raise ValueError(
'Dimension %d does not exist. Valid dimensions indices'
' are in the range %s.' %(dim, [0, len(self)-1])
)
idx = dim
else:
raise TypeError('Unhandled type for `dim`: "%s"' %type(dim))
return idx
[docs]
def remove(self, dims):
"""Remove dimensions.
Parameters
----------
dims : str, int, or sequence thereof
Dimensions to be removed
Returns
-------
binning : MultiDimBinning
Identical binning as this but with `dims` removed.
"""
if isinstance(dims, (str, int)):
dims = [dims]
keep_idx = list(range(len(self)))
for dim in dims:
idx = self.index(dim)
keep_idx.remove(idx)
keep_dims = [deepcopy(self.dimensions[idx]) for idx in keep_idx]
return MultiDimBinning(keep_dims)
# TODO: add *args to handle positional indexing (?) (also would need to
# add this to `slice` method if implemented.
[docs]
def indexer(self, **kwargs):
"""Any dimension index/slice not specified by name in kwargs will
default to ":" (all elements).
Parameters
---------
**kwargs
kwargs are names of dimension(s) and assigned to these are either
an integer index into that dimension or a Python `slice` object for
that dimension. See examples below for details.
Returns
-------
indexer : tuple
See Also
--------
broadcast
Assignment of a one-dimensional array to a higher-dimensional array
is simplified greatly by using `broadcast` in conjunction with
`indexer` or `pisa.core.map.Map.slice`. See examples in
docs for `broadcast`.
broadcaster
Similar to `broadcast`, but returns a tuple that can be applied to
broadcast any one-dimensional array.
slice
Apply the `indexer` returned by this method to this MultiDimBinning
object, returning a new MultiDimBinning object.
pisa.core.map.Map.slice
Same operation, but slices a Map object by dimension-name
(internally, calls `indexer`).
Examples
--------
>>> from pisa import ureg
>>> from pisa.core.binning import MultiDimBinning, OneDimBinning
>>> ebins = OneDimBinning(name='energy', is_log=True,
... num_bins=40, domain=[1, 80]*ureg.GeV)
>>> czbins = OneDimBinning(name='coszen',
... is_lin=True, num_bins=4, domain=[-1, 0])
>>> mdb = ebins * czbins
>>> print(mdb.indexer(energy=0))
(0, slice(None, None, None))
Omitting a dimension (coszen in the above) is equivalent to slicing
with a colon (i.e., `(0, slice(None))`):
>>> print(mdb.indexer(energy=0, coszen=slice(None)))
(0, slice(None, None, None))
>>> print(mdb.indexer(energy=slice(None), coszen=1))
(slice(None, None, None), 1)
Now create an indexer to use on a Numpy array:
>>> x = np.random.RandomState(0).uniform(size=mdb.shape)
>>> indexer = mdb.indexer(energy=slice(0, 5), coszen=1)
>>> print(x[indexer])
[0.71518937 0.64589411 0.38344152 0.92559664 0.83261985]
"""
indexer = []
for dim in self.dims:
if dim.name in kwargs:
val = kwargs[dim.name]
if isinstance(val, str):
val = dim.index(val)
indexer.append(val)
else:
indexer.append(slice(None))
return tuple(indexer)
[docs]
def slice(self, **kwargs):
"""Slice the binning by dimension name. Any dimension/index not
specified by name in kwargs will default to ":" (all bins).
Uses `indexer` internally to define the indexing tuple.
Returns
-------
sliced_binning : MultiDimBinning
"""
return self[self.indexer(**kwargs)]
[docs]
def broadcast(self, a, from_dim, to_dims):
"""Take a one-dimensional array representing one input dimension and
broadcast it across some number of output dimensions.
Parameters
----------
a : 1D array
Data from the `from_dim` dimension. `a` must have same length as
the dimension it comes from (or Numpy must be able to automatically
cast it into this dimension).
from_dim : string
Name of dimension that the data in `a` comes from.
to_dims : string or iterable of strings
Dimension(s) to cast `a` into.
Returns
-------
a_broadcast : array
Broadcast version of `a`
See Also
--------
broadcaster
The method used internally to derive the tuple used to broadcast
the array. This can be used directly to return the broadcaster for
use on other Maps or Numpy arrays.
"""
assert isinstance(a, np.ndarray)
a_shape = a.shape
assert len(a_shape) == 1
return a[self.broadcaster(from_dim=from_dim, to_dims=to_dims)]
[docs]
def broadcaster(self, from_dim, to_dims):
"""Generate an indexder that, if applied to a one-dimensional array
representing data from one dimension, broadcasts that array into some
number of other dimensions.
Parameters
----------
from_dim : string
Name of dimension that the data in comes from.
to_dims : string or iterable of strings
Dimension(s) to cast into.
Returns
-------
bcast : tuple
Tuple that can be applied to a Numpy array for purposes of
broadcasting it. E.g. use as `np.array([0,1,2])[bcast]`.
"""
if isinstance(to_dims, str):
to_dims = [to_dims]
bcast = []
for name in self.names:
if name == from_dim:
bcast.append(slice(None))
elif name in to_dims:
bcast.append(np.newaxis)
return tuple(bcast)
[docs]
def iterbins(self):
"""Return an iterator over each N-dimensional bin. The elments returned
by the iterator are each a MultiDimBinning, just containing a single
bin.
Returns
-------
bin_iterator
See Also
--------
index2coord
convert the (flat) index to multi-dimensional coordinate, which is
useful when using e.g. `enumerate(iterbins)`
"""
return (MultiDimBinning(dims) for dims in product(*self.dims))
[docs]
def iteredgetuples(self):
"""Return an iterator over each bin's edges. The elments returned by
the iterator are a tuple of tuples, where the innermost tuples
correspond to each dimension (in the order they're defined here).
Units are stripped prior to iteration for purposes of speed.
Note that this method is, according to one simple test, about 5000x
faster than `iterbins`.
Returns
-------
edges_iterator
See Also
--------
iterbins
Similar, but returns a OneDimBinning object for each bin. This is
slower that `iteredgetuples` but easier to work with.
"""
return product(*(dim.iteredgetuples() for dim in self.dims))
[docs]
def itercoords(self):
"""Return an iterator over each N-dimensional coordinate into the
binning. The elments returned by the iterator are each a namedtuple,
which can be used to directly index into the binning.
Returns
-------
coord_iterator
See Also
--------
iterbins
Iterator over each bin
index2coord
convert the (flat) index to multi-dimensional coordinate, which is
useful when using e.g. `enumerate(iterbins)`
"""
return (self.index2coord(i) for i in range(self.size))
[docs]
def index2coord(self, index):
"""Convert a flat index into an N-dimensional bin coordinate.
Useful in conjunction with `enumerate(iterbins)`
Parameters
----------
index : integer
The flat index
Returns
-------
coord : self.coord namedtuple
Coordinates are in the same order as the binning is here defined
and each coordinate is named by its corresponding dimension.
Therefore integer indexing into `coord` as well as named indexing
are possible.
"""
coord = []
quot = index
for dim_length in self.shape[::-1]:
quot, rem = divmod(quot, dim_length)
coord.append(rem)
return self.coord(*coord[::-1]) # pylint: disable=not-callable
# TODO: examples!
[docs]
def reorder_dimensions(self, order, use_deepcopy=False,
use_basenames=False):
"""Return a new MultiDimBinning object with dimensions ordered
according to `order`.
Parameters
----------
order : MultiDimBinning or sequence of string, int, or OneDimBinning
Order of dimensions to use. Strings are interpreted as dimension
basenames, integers are interpreted as dimension indices, and
OneDimBinning objects are interpreted by their `basename`
attributes (so e.g. the exact binnings in `order` do not have to
match this object's exact binnings; only their basenames). Note
that a MultiDimBinning object is a valid sequence type to use for
`order`.
Notes
-----
Dimensions specified in `order` that are not in this object are
ignored, but dimensions in this object that are missing in `order`
result in an error.
Returns
-------
MultiDimBinning object with reordred dimensions.
Raises
------
ValueError if dimensions present in this object are missing from
`order`.
Examples
--------
>>> b0 = MultiDimBinning(...)
>>> b1 = MultiDimBinning(...)
>>> b2 = b0.reorder_dimensions(b1)
>>> print(b2.binning.names)
"""
if hasattr(order, 'binning') and isinstance(order.binning,
MultiDimBinning):
order = order.binning.dims
elif isinstance(order, MultiDimBinning):
order = order.dims
indices = []
for dim in order:
try:
idx = self.index(dim, use_basenames=use_basenames)
except ValueError:
continue
indices.append(idx)
if set(indices) != set(range(len(self))):
raise ValueError(
'Invalid `order`: Only a subset of the dimensions present'
' were specified. `order`=%s, but dimensions=%s'
%(order, self.names)
)
if use_deepcopy:
new_dimensions = [deepcopy(self._dimensions[n]) for n in indices]
else:
new_dimensions = [self._dimensions[n] for n in indices]
new_binning = MultiDimBinning(new_dimensions)
return new_binning
[docs]
def is_compat(self, other):
"""Check if another binning is compatible with this binning.
Note that for now, only downsampling is allowed from other to this, and
not vice versa.
Parameters
----------
other : MultiDimBinning
Returns
-------
is_compat : bool
"""
if not set(self.names) == set(other.names):
logging.trace('dimension names do not match')
return False
for name in self.names:
if not self[name].is_compat(other[name]):
return False
return True
[docs]
def oversample(self, *args, **kwargs):
"""Return a MultiDimBinning object oversampled relative to this one.
Parameters
----------
*args : each factor an int
Factors by which to oversample the binnings. There must either be
one factor (one arg)--which will be broadcast to all dimensions--or
there must be as many factors (args) as there are dimensions.
If positional args are specified (i.e., non-kwargs), then kwargs
are forbidden. For more detailed control, use keyword arguments to
specify the dimension(s) to be oversampled and their factors.
**kwargs : name=factor pairs
Dimensions not specified default to oversample factor of 1 (i.e.,
no oversampling)
Returns
-------
new_binning : MultiDimBinning
New binning, oversampled from the current binning.
Notes
-----
You can either specify oversmapling by passing in args (ordered values,
no keywords) or kwargs (order doesn't matter, but uses keywords), but
not both.
Specifying simple args (no keywords) requires either a single scalar
(in which case all dimensions will be oversampled by the same factor)
or one scalar per dimension (which oversamples the dimensions in the
order specified).
Specifying keyword args is far more explicit (and general), where each
dimension's oversampling can be specified by name=factor pairs, but not
every dimension must be specified (where no oversampling is applied to
unspecified dimensions).
See Also
--------
downsample
Similar to this, but downsample the MultiDimBinning
OneDimBinning.oversample
Oversample a OneDimBinning object; this method is called to
actually perform the oversampling for each dimension within this
MultiDimBinning object
OneDimBinning.downsample
Same but downsample for OneDimBinning
Examples
--------
>>> x = OneDimBinning('x', bin_edges=[0, 1, 2])
>>> y = OneDimBinning('y', bin_edges=[0, 20])
>>> mdb = x * y
The following are all equivalent:
>>> print(mdb.oversample(2))
MultiDimBinning(
OneDimBinning('x', 4 linearly-regular bins spanning [0.0, 2.0] (behavior is linear)),
OneDimBinning('y', 2 linearly-regular bins spanning [0.0, 20.0] (behavior is linear))
)
>>> print(mdb.oversample(2, 2))
MultiDimBinning(
OneDimBinning('x', 4 linearly-regular bins spanning [0.0, 2.0] (behavior is linear)),
OneDimBinning('y', 2 linearly-regular bins spanning [0.0, 20.0] (behavior is linear))
)
>>> print(mdb.oversample(x=2, y=2))
MultiDimBinning(
OneDimBinning('x', 4 linearly-regular bins spanning [0.0, 2.0] (behavior is linear)),
OneDimBinning('y', 2 linearly-regular bins spanning [0.0, 20.0] (behavior is linear))
)
But with kwargs, you can specify only the dimensions you want to
oversample, and the other dimension(s) remain unchanged:
>>> print(mdb.oversample(y=5))
MultiDimBinning(
OneDimBinning('x', 2 linearly-regular bins spanning [0.0, 2.0] (behavior is linear)),
OneDimBinning('y', 5 linearly-regular bins spanning [0.0, 20.0] (behavior is linear))
)
"""
assert self.mask is None, "`oversample` function does not currenty support bin masking"
if args:
assert len(args) in [1, len(self)]
elif kwargs:
for name in self.names:
if name not in kwargs:
kwargs[name] = 1
factors = self._args_kwargs_to_list(*args, **kwargs)
new_binning = [dim.oversample(f)
for dim, f in zip(self._dimensions, factors)]
return MultiDimBinning(new_binning)
[docs]
def downsample(self, *args, **kwargs):
"""Return a Binning object downsampled relative to this binning.
Parameters
----------
*args : each factor an int
Factors by which to downsample the binnings. There must either be
one factor (one arg)--which will be broadcast to all dimensions--or
there must be as many factors (args) as there are dimensions.
If positional args are specified (i.e., non-kwargs), then kwargs
are forbidden.
**kwargs : name=factor pairs
Returns
-------
new_binning : MultiDimBinning
New binning, downsampled from the current binning.
Notes
-----
Can either specify downsampling by passing in args (ordered values, no
keywords) or kwargs (order doesn't matter, but uses keywords), but not
both.
See Also
--------
oversample
Oversample (upsample) a the MultiDimBinning
OneDimBinning.downsample
The method actually called to perform the downsampling for each
OneDimBinning within this MultiDimBinning object.
OneDimBinning.oversample
Same, but oversample (upsample) a OneDimBinning object
"""
assert self.mask is None, "`downsample` function does not currenty support bin masking"
if args:
assert len(args) in [1, len(self)]
elif kwargs:
for name in self.names:
if name not in kwargs:
kwargs[name] = 1
factors = self._args_kwargs_to_list(*args, **kwargs)
new_binning = [dim.downsample(f)
for dim, f in zip(self._dimensions, factors)]
return MultiDimBinning(new_binning)
[docs]
def assert_array_fits(self, array):
"""Check if a 2D array of values fits into the defined bins (i.e., has
the exact shape defined by this binning).
Parameters
----------
array : 2D array (or sequence-of-sequences)
Returns
-------
fits : bool, True if array fits or False otherwise
Raises
------
ValueError if array shape does not match the binning shape
"""
if array.shape != self.shape:
raise ValueError(
'Array shape %s does not match binning shape %s'
% (array.shape, self.shape)
)
[docs]
def assert_compat(self, other):
"""Check if a (possibly different) binning can map onto the defined
binning. Allows for simple re-binning schemes (but no interpolation).
Parameters
----------
other : Binning or container with attribute "binning"
Returns
-------
compat : bool
"""
if not isinstance(other, MultiDimBinning):
for val in other.__dict__.values():
if isinstance(val, MultiDimBinning):
other = val
break
assert isinstance(other, MultiDimBinning), str(type(other))
if other == self:
return True
for my_dim, other_dim in zip(self, other):
if not my_dim.assert_compat(other_dim):
return False
return True
[docs]
def squeeze(self):
"""Remove any singleton dimensions (i.e. that have only a single bin).
Analagous to `numpy.squeeze`.
Returns
-------
MultiDimBinning with only non-singleton dimensions
"""
assert self.mask is None, "`squeeze` function does not currenty support bin masking"
return MultiDimBinning(d for d in self if len(d) > 1)
def _args_kwargs_to_list(self, *args, **kwargs):
"""Take either args or kwargs (but not both) and convert into a simple
sequence of values. Broadcasts a single arg to all dimensions."""
if not np.logical_xor(len(args), len(kwargs)):
raise ValueError('Either args (values specified by order and not'
' specified by name) or kwargs (values specified'
' by name=value pairs) can be used, but not'
' both.')
if len(args) == 1:
return [args[0]]*self.num_dims
if len(args) > 1:
if len(args) != self.num_dims:
raise ValueError('Specified %s args, but binning is'
' %s-dim.' %(len(args), self.num_dims))
return args
if set(kwargs.keys()) != set(self.names):
raise ValueError('Specified dimensions "%s" but this has'
' dimensions "%s"' %(sorted(kwargs.keys()),
self.names))
return [kwargs[name] for name in self.names]
[docs]
def ito(self, *args, **kwargs):
"""Convert units in-place. Cf. Pint's `ito` method."""
units_list = self._args_kwargs_to_list(*args, **kwargs)
for dim, units in zip(self.iterdims(), units_list):
dim.ito(units)
[docs]
def to(self, *args, **kwargs): # pylint: disable=invalid-name
"""Convert the contained dimensions to the passed units. Unspecified
dimensions will be omitted.
"""
units_list = self._args_kwargs_to_list(*args, **kwargs)
new_binnings = [dim.to(units)
for dim, units in zip(self.iterdims(), units_list)]
return MultiDimBinning(new_binnings)
[docs]
def meshgrid(self, entity, attach_units=True):
"""Apply NumPy's meshgrid method on various entities of interest.
Parameters
----------
entity : string
Can be any attribute of OneDimBinning that returns a 1D array with
units. E.g., one of 'midpoints', 'weighted_centers', 'bin_edges',
'bin_widths', or 'weighted_bin_widths'
attach_units : bool
Whether to attach units to the result (can save computation time by
not doing so).
Returns
-------
[X1, X2,..., XN] : list of numpy ndarray or Pint quantities of the same
One ndarray or quantity is returned per dimension; see docs for
`numpy.meshgrid` for details
See Also
--------
numpy.meshgrid
"""
entity = entity.lower().strip()
arrays = []
units = []
for dim in self.iterdims():
try:
quantity_array = getattr(dim, entity)
except AttributeError:
logging.error(
"Dimension %s does not contain entity '%s'", dim.name, entity
)
raise
if attach_units:
units.append(quantity_array.units)
arrays.append(quantity_array.magnitude)
# NOTE: numpy versions prior to 1.13.0, meshgrid returned float64 even
# if inputs are float32 to mesghrid. Use `astype` as a fix. Note that
# since `astype` already creates a copy of the array even if dtype of
# input is the same, setting `copy` to False is ok in the argument to
# meshgrid; i.e., if a user modifies an element of the returned array,
# it should not affect the original `entity` from which the meshgrid
# was generated.
mg = [a.astype(FTYPE) for a in np.meshgrid(*arrays, indexing='ij', copy=False)]
if attach_units:
return [m*u for m, u in zip(mg, units)]
return mg
# TODO: modify technique depending upon grid size for memory concerns, or
# even take a `method` argument to force method manually.
[docs]
def bin_volumes(self, attach_units=True):
"""Bin "volumes" defined in `num_dims`-dimensions
Parameters
----------
attach_units : bool
Whether to attach pint units to the resulting array
Returns
-------
volumes : array
Bin volumes
"""
meshgrid = self.meshgrid(entity='bin_widths', attach_units=False)
volumes = reduce(mul, meshgrid)
if attach_units:
volumes *= reduce(mul, (ureg(str(d.units)) for d in self.iterdims()))
return volumes
[docs]
def weighted_bin_volumes(self, attach_units=True):
"""Bin "volumes" defined in `num_dims`-dimensions, but unlike
`bin_volumes`, the volume is evaluated in the space of the binning.
E.g., logarithmic bins have `weighted_bin_volumes` of equal size in
log-space.
Parameters
----------
attach_units : bool
Whether to attach pint units to the resulting array
Returns
-------
volumes : array
Bin volumes
"""
meshgrid = self.meshgrid(entity='weighted_bin_widths', attach_units=False)
volumes = reduce(mul, meshgrid)
if attach_units:
# NOTE we use the units from `weighted_bin_widths` because these
# can be different from those of the dimension
volumes *= reduce(
mul,
(ureg(str(d.weighted_bin_widths.units)) for d in self.iterdims()),
)
return volumes
[docs]
def empty(self, name, map_kw=None, **kwargs):
"""Return a Map whose hist is an "empty" numpy ndarray with same
dimensions as this binning.
The contents are not _actually_ empty, just undefined. Therefore be
careful to populate the array prior to using its contents.
Parameters
----------
name : string
Name of the Map
map_kw : None or dict
keyword arguments sent to instantiate the new Map (except `name`
which is specified above)
**kwargs
keyword arguments passed on to numpy.empty() (except `shape` which
must be omitted)
Returns
-------
map : Map
"""
assert 'shape' not in kwargs
if map_kw is None:
map_kw = {}
if 'dtype' not in kwargs:
kwargs['dtype'] = FTYPE
hist = np.empty(self.shape, **kwargs)
return self._map_class(name=name, hist=hist, binning=self, **map_kw) # pylint: disable=not-callable
[docs]
def zeros(self, name, map_kw=None, **kwargs):
"""Return a numpy ndarray filled with 0's with same dimensions as this
binning.
Parameters
----------
name : string
Name of the map
map_kw : None or dict
keyword arguments sent to instantiate the new Map (except `name`
which is specified above)
**kwargs
keyword arguments passed on to numpy.zeros() (except `shape` which
must be omitted)
Returns
-------
map : Map
"""
assert 'shape' not in kwargs
if map_kw is None:
map_kw = {}
if 'dtype' not in kwargs:
kwargs['dtype'] = FTYPE
hist = np.zeros(self.shape, **kwargs)
return self._map_class(name=name, hist=hist, binning=self, **map_kw) # pylint: disable=not-callable
[docs]
def ones(self, name, map_kw=None, **kwargs):
"""Return a numpy ndarray filled with 1's with same dimensions as this
binning.
Parameters
----------
name : string
Name of the map
map_kw : None or dict
keyword arguments sent to instantiate the new Map (except `name`
which is specified above)
**kwargs
keyword arguments passed on to numpy.ones() (except `shape` which
must be omitted)
Returns
-------
map : Map
"""
assert 'shape' not in kwargs
if map_kw is None:
map_kw = {}
if 'dtype' not in kwargs:
kwargs['dtype'] = FTYPE
hist = np.ones(self.shape, **kwargs)
return self._map_class(name=name, hist=hist, binning=self, **map_kw) # pylint: disable=not-callable
[docs]
def full(self, fill_value, name, map_kw=None, **kwargs):
"""Return a map whose `hist` is filled with `fill_value` of same
dimensions as this binning.
Parameters
----------
fill_value
Value with which to fill the map
name : string
Name of the map
map_kw : None or dict
keyword arguments sent to instantiate the new Map (except `name`
which is specified above)
**kwargs
keyword arguments passed on to numpy.fill_value() (except `shape`,
which must be omitted)
Returns
-------
map : Map
"""
assert 'shape' not in kwargs
if map_kw is None:
map_kw = {}
if 'dtype' not in kwargs:
kwargs['dtype'] = FTYPE
hist = np.full(self.shape, fill_value, **kwargs)
return self._map_class(name=name, hist=hist, binning=self, **map_kw) # pylint: disable=not-callable
def __contains__(self, x):
if isinstance(x, OneDimBinning):
return x in self.dims
if isinstance(x, str):
return x in self.names
return False
def __eq__(self, other):
if not isinstance(other, MultiDimBinning):
return False
return recursiveEquality(self.hashable_state, other.hashable_state)
# TODO: remove this method, as it should just be considered an outer
# product to increase dimensionality (i.e. the "*" operator, or __mul__
# makes more sense than "+" or __add__)?
def __add__(self, other):
other = MultiDimBinning(other)
return MultiDimBinning(chain(self, other))
def __mul__(self, other):
if isinstance(other, (Mapping, OneDimBinning)):
other = [other]
other = MultiDimBinning(other)
return MultiDimBinning(chain(self, other))
# TODO: should __getattr__ raise its own exception if the attr is not found
# as a dimension rather than call parent's __getattribute__ method, since
# presumably that already failed?
def __getattr__(self, attr):
# If youve gotten here, __getattribute__ has failed. Try to get the
# attr as a contained dimension:
try:
return self.__getitem__(attr)
except (KeyError, ValueError):
# If that failed, re-run parent's __getattribute__ which will raise
# an appropriate exception
return super().__getattribute__(attr)
# TODO: refine handling of ellipsis such that the following work as in
# Numpy:
# * ['dim0', 'dim3', ...]
# * ['dim0', 3, ...]
# * [...]
# * [0, ...]
# * [..., 2]
# * [..., 2, 1, 4]
def __getitem__(self, index):
"""Interpret indices as indexing bins and *not* bin edges.
Indices refer to dimensions in same order they were specified at
instantiation, and all dimensions must be present.
Parameters
----------
index : str, int, len-N-sequence of ints, or len-N-sequence of slices
If str is passed: Return the binning corresponding to the named
dimension
If an integer is passed:
* If num_dims is 4, `index` indexes into the bins of the sole
OneDimBinning. The bin is returned.
* If num_dims > 1, `index` indexes which contained OneDimBinning
object to return.
If a len-N-sequence of integers or slices is passed, dimensions are
indexed by these in the order in which dimensions are stored
internally.
Returns
-------
A MultiDimBinning object new Binning object but with the bins specified
by `index`. Whether or not behavior is logarithmic is unchanged.
"""
# If ellipsis, return everything
if index is Ellipsis:
return self
# If arg is a string, it should specify a dimension
# Check if it doesn, and if so return that OneDimBinning for that dimension
if isinstance(index, str):
# Check there is no mask defined (a mask does not make sense on a per dimension basis,
# so cannot apply this operation when masking is involved)
#TODO Removed for now as causes issues, need to come back to think about this...
# if self.mask is not None :
# raise ValueError("Cannot extract a single binning dimension when a mask is used") #TODO This gets called duign deepcopy with index==__deppcopy__??!?!? If I assert instead of using ValueError theneverything fails. Not sure what is going on here...
# Find the dimension
for d in self.iterdims():
if d.name == index:
return d
raise ValueError(f"index '{index}' not in {self.names}")
# TODO: implement a "linearization" like np.flatten() to iterate
# through each bin individually without hassle for the user...
#if self.num_dims == 1 and np.isscalar(index):
# return self._dimensions[0]
# If here, the index is a numerical, numpy-comaptible index
# Do some pre-processing on it to get it in the right format...
if isinstance(index, Iterable) and not isinstance(index, Sequence):
index = list(index)
if not isinstance(index, Sequence):
index = [index]
# Turn any inetger indices into an equivalent single element slice
# This ensures the array dimensionality is maintained
if isinstance(index, tuple):
index = list(index)
for i, idx in enumerate(index):
if isinstance(idx, int) and idx >= 0 : index[i] = slice(idx, idx+1, None)
elif isinstance(idx, int) and idx < 0 : index[i] = slice(idx+1, idx, None)
elif isinstance(idx, slice): index[i] = idx
else: raise ValueError('Binning idx is %s, int or slice is needed'%idx)
# Get the new binning, based on the index
input_dim = len(index)
if input_dim != self.num_dims:
raise ValueError('Binning is %dD, but %dD indexing was passed'
%(self.num_dims, input_dim))
new_binning = {'dimensions': [dim[idx] for dim, idx in
zip(self.iterdims(), index)]}
# Also update mask, if there is one
new_mask = None if self.mask is None else self.mask[index]
return MultiDimBinning(**new_binning, mask=new_mask)
def __iter__(self):
"""Iterate over dimensions. Use `iterbins` to iterate over bins."""
return iter(self._dimensions)
def __len__(self):
return self.num_dims
def __ne__(self, other):
return not self.__eq__(other)
[docs]
def test_OneDimBinning():
"""Unit tests for OneDimBinning class"""
# pylint: disable=line-too-long, wrong-import-position
import pickle
import os
import shutil
import tempfile
# needed so that eval(repr(b)) works
from numpy import array, float32, float64 # pylint: disable=unused-variable
b1 = OneDimBinning(name='true_energy', num_bins=40, is_log=True,
domain=[1, 80]*ureg.GeV, tex=r'E_{\rm true}',
bin_names=[str(i) for i in range(40)])
b2 = OneDimBinning(name='coszen', num_bins=40, is_lin=True,
domain=[-1, 1], bin_names=None,
tex=r'\cos\theta')
b3 = OneDimBinning(name='reco_energy', num_bins=40, is_log=True,
domain=[1, 80]*ureg.GeV, tex=r'E_{\rm reco}',
bin_names=[str(i) for i in range(40)])
# Test label
_ = b1.label
_ = b1.label
assert b1.basename_binning == b1.basename_binning
assert b1.basename_binning == b3.basename_binning
assert b1.basename_binning != b2.basename_binning
# Oversampling/downsampling
b1_over = b1.oversample(2)
assert b1_over.is_bin_spacing_log_uniform(b1_over.bin_edges)
b1_down = b1.downsample(2)
assert b1_down.is_bin_spacing_log_uniform(b1_down.bin_edges)
assert b1_down.is_compat(b1)
assert b1.is_compat(b1_over)
assert b1_down.is_compat(b1_over)
# Bin width consistency
assert np.isclose(
np.sum(b1_over.bin_widths.m),
np.sum(b1.bin_widths.m),
**ALLCLOSE_KW,
)
assert np.isclose(
np.sum(b1_down.bin_widths.m),
np.sum(b1.bin_widths.m),
**ALLCLOSE_KW,
)
assert np.isclose(
np.sum(b1_over.bin_widths.m),
np.sum(b1_down.bin_widths.m),
**ALLCLOSE_KW,
)
# Weighted bin widths must also sum up to the same total width
assert np.isclose(
np.sum(b1_over.weighted_bin_widths.m),
np.sum(b1.weighted_bin_widths.m),
**ALLCLOSE_KW,
)
assert np.isclose(
np.sum(b1_down.weighted_bin_widths.m),
np.sum(b1.weighted_bin_widths.m),
**ALLCLOSE_KW,
)
assert np.isclose(
np.sum(b1_over.weighted_bin_widths.m),
np.sum(b1_down.weighted_bin_widths.m),
**ALLCLOSE_KW,
)
logging.debug('len(b1): %s', len(b1))
logging.debug('b1: %s', b1)
logging.debug('b2: %s', b2)
logging.debug('b1.oversample(10): %s', b1.oversample(10))
logging.debug('b1.oversample(1): %s', b1.oversample(1))
# Slicing
logging.debug('b1[1:5]: %s', b1[1:5])
logging.debug('b1[:]: %s', b1[:])
logging.debug('b1[-1]: %s', b1[-1])
logging.debug('b1[:-1]: %s', b1[:-1])
logging.debug('copy(b1): %s', copy(b1))
logging.debug('deepcopy(b1): %s', deepcopy(b1))
# Indexing by Ellipsis
assert b1[...] == b1
# Pickling
s = pickle.dumps(b1, pickle.HIGHEST_PROTOCOL)
b1_loaded = pickle.loads(s)
s = pickle.dumps(b1[0], pickle.HIGHEST_PROTOCOL)
b1_loaded = pickle.loads(s)
assert b1_loaded == b1[0]
try:
b1[-1:-3]
except ValueError:
pass
else:
assert False
b3 = OneDimBinning(name='distance', num_bins=10, is_log=True,
domain=[0.1, 10]*ureg.m)
b4 = OneDimBinning(name='distance', num_bins=10, is_log=True,
domain=[1e5, 1e7]*ureg.um)
_ = hash_obj(b3)
_ = b3.hash
_ = hash(b3)
_ = hash_obj(b3[0])
_ = b3[0].hash # pylint: disable=no-member
_ = hash(b3[0])
b3.normalize_values = True
b4.normalize_values = True
_ = hash_obj(b3)
_ = b3.hash
_ = hash(b3)
_ = hash_obj(b3[0])
_ = b3[0].hash # pylint: disable=no-member
_ = hash(b3[0])
# Without rounding, converting bin edges to base units yields different
# results due to finite precision effects
assert np.any(normQuant(b3.bin_edges, sigfigs=None)
!= normQuant(b4.bin_edges, sigfigs=None))
# Normalize function should take care of this
assert np.all(normQuant(b3.bin_edges, sigfigs=HASH_SIGFIGS, full_norm=True)
== normQuant(b4.bin_edges, sigfigs=HASH_SIGFIGS, full_norm=True)), \
'normQuant(b3.bin_edges)=\n%s\nnormQuant(b4.bin_edges)=\n%s' \
%(normQuant(b3.bin_edges, sigfigs=HASH_SIGFIGS, full_norm=True),
normQuant(b4.bin_edges, sigfigs=HASH_SIGFIGS, full_norm=True))
# And the hashes should be equal, reflecting the latter result
assert b3.hash == b4.hash, \
'\nb3=%s\nb4=%s' % (b3.hashable_state, b4.hashable_state)
assert b3.hash == b4.hash, 'b3.hash=%s; b4.hash=%s' %(b3.hash, b4.hash)
s = pickle.dumps(b3, pickle.HIGHEST_PROTOCOL)
b3_loaded = pickle.loads(s)
assert b3_loaded == b3
testdir = tempfile.mkdtemp()
try:
for b in [b1, b2, b3, b4]:
assert eval(repr(b)) == b, repr(b) # pylint: disable=eval-used
b_file = os.path.join(testdir, 'one_dim_binning.json')
b.to_json(b_file, warn=False)
b_ = OneDimBinning.from_json(b_file)
assert b_ == b, 'b=\n%s\nb_=\n%s' %(b, b_)
jsons.to_json(b, b_file, warn=False)
b_ = OneDimBinning.from_json(b_file)
assert b_ == b, 'b=\n%s\nb_=\n%s' %(b, b_)
# Had bug where datastruct containing MultiDimBinning failed to be
# saved. # Test tuple containing list containing OrderedDict
# containing OneDimBinning here.
struct = ([OrderedDict(odb=b)],)
jsons.to_json(struct, b_file, warn=False)
loaded = jsons.from_json(b_file)
b_ = OneDimBinning(**loaded[0][0]['odb'])
assert b_ == b
# Now try with pickle
b_file = os.path.join(testdir, 'one_dim_binning.pkl')
with open(b_file, 'wb') as fobj:
pickle.dump(struct, fobj, protocol=pickle.HIGHEST_PROTOCOL)
with open(b_file, 'rb') as fobj:
loaded = pickle.load(fobj)
b_ = loaded[0][0]['odb']
assert b_ == b
except:
logging.error('b that failed: %s', b)
raise
finally:
shutil.rmtree(testdir, ignore_errors=True)
logging.info('<< PASS : test_OneDimBinning >>')
[docs]
def test_MultiDimBinning():
"""Unit tests for MultiDimBinning class"""
# pylint: disable=wrong-import-position
import pickle
import os
import shutil
import tempfile
import time
import pytest
# needed so that eval(repr(mdb)) works
from numpy import array, float32, float64 # pylint: disable=unused-variable
b1 = OneDimBinning(name='energy', num_bins=40, is_log=True,
domain=[1, 80]*ureg.GeV)
b2 = OneDimBinning(name='coszen', num_bins=40, is_lin=True,
domain=[-1, 1])
mdb = MultiDimBinning([b1, b2])
assert eval(repr(mdb)) == mdb # pylint: disable=eval-used
_ = hash_obj(mdb)
_ = mdb.hash
_ = hash(mdb)
_ = hash_obj(mdb[0, 0])
_ = mdb[0, 0].hash
_ = hash(mdb[0, 0])
_ = mdb[0, 0]
_ = mdb[0:, 0]
_ = mdb[0:, 0:]
_ = mdb[0, 0:]
_ = mdb[-1, -1]
# TODO: following should work as in Numpy:
# assert mdb[:] == mdb
# assert mdb[0] == b1
# assert mdb[1] == b2
assert mdb[:, :] == mdb
# Index by dim names
assert mdb["energy"] == b1
assert mdb["coszen"] == b2
try:
mdb["nonexistent"]
except Exception:
pass
else:
raise Exception('non-existent name should raise exception')
# Index by dim number
assert MultiDimBinning([b1])[0] == MultiDimBinning([b1[0]])
logging.debug('%s', mdb.energy)
logging.debug('copy(mdb): %s', copy(mdb))
logging.debug('deepcopy(mdb): %s', deepcopy(mdb))
assert deepcopy(mdb) == mdb
s = pickle.dumps(mdb, pickle.HIGHEST_PROTOCOL)
mdb2 = pickle.loads(s)
assert mdb2 == mdb
s = pickle.dumps(mdb[0, 0], pickle.HIGHEST_PROTOCOL)
mdb2 = pickle.loads(s)
assert mdb2 == mdb[0, 0]
binning = MultiDimBinning([
dict(name='energy', is_log=True, domain=[1, 80]*ureg.GeV, num_bins=40),
dict(name='coszen', is_lin=True, domain=[-1, 0], num_bins=20)
])
ord_dict_of_binnings = OrderedDict([('x', mdb), ('y', binning)])
_ = hash_obj(ord_dict_of_binnings)
_ = normQuant(ord_dict_of_binnings)
for flatindex, this_bin in enumerate(binning.iterbins()):
coord = binning.index2coord(flatindex)
assert this_bin == binning[coord]
assert binning.num_bins == [40, 20]
assert binning.tot_num_bins == 40 * 20
assert binning.oversample(10).shape == (400, 200)
assert binning.oversample(10, 1).shape == (400, 20)
assert binning.oversample(1, 3).shape == (40, 60)
assert binning.downsample(4, 2).shape == (10, 10)
assert binning.oversample(coszen=10, energy=2).shape == (80, 200)
assert binning.oversample(1, 1) == binning
assert binning.to('MeV', '')['energy'].units == ureg.MeV
assert binning.to('MeV', '') == binning, 'converted=%s\norig=%s' \
%(binning.to('MeV', ''), binning)
assert binning.to('MeV', '').hash == binning.hash
_ = binning.meshgrid(entity='bin_edges')
_ = binning.meshgrid(entity='weighted_centers')
_ = binning.meshgrid(entity='midpoints')
_ = binning.meshgrid(entity='bin_widths')
_ = binning.meshgrid(entity='weighted_bin_widths')
_ = binning.bin_volumes(attach_units=False)
_ = binning.bin_volumes(attach_units=True)
_ = binning.weighted_bin_volumes(attach_units=False)
_ = binning.weighted_bin_volumes(attach_units=True)
binning.to('MeV', None)
binning.to('MeV', '')
binning.to(ureg.joule, '')
oversampled = binning.oversample(10, 3)
assert oversampled.shape == (400, 60)
downsampled = binning.downsample(4, 2)
assert downsampled.shape == (10, 10)
over_vols = oversampled.bin_volumes(attach_units=False)
down_vols = downsampled.bin_volumes(attach_units=False)
norm_vols = binning.bin_volumes(attach_units=False)
assert np.isclose(np.sum(over_vols), np.sum(norm_vols), **ALLCLOSE_KW)
assert np.isclose(np.sum(down_vols), np.sum(norm_vols), **ALLCLOSE_KW)
assert np.isclose(np.sum(down_vols), np.sum(over_vols), **ALLCLOSE_KW)
over_vols = oversampled.weighted_bin_volumes(attach_units=False)
down_vols = downsampled.weighted_bin_volumes(attach_units=False)
norm_vols = binning.weighted_bin_volumes(attach_units=False)
assert np.isclose(np.sum(over_vols), np.sum(norm_vols), **ALLCLOSE_KW)
assert np.isclose(np.sum(down_vols), np.sum(norm_vols), **ALLCLOSE_KW)
assert np.isclose(np.sum(down_vols), np.sum(over_vols), **ALLCLOSE_KW)
testdir = tempfile.mkdtemp()
try:
b_file = os.path.join(testdir, 'multi_dim_binning.json')
binning.to_json(b_file, warn=False)
b_ = MultiDimBinning.from_json(b_file)
assert b_ == binning, 'binning=\n%s\nb_=\n%s' %(binning, b_)
jsons.to_json(binning, b_file, warn=False)
b_ = MultiDimBinning.from_json(b_file)
assert b_ == binning, 'binning=\n%s\nb_=\n%s' %(binning, b_)
# Had bug where datastruct containing MultiDimBinning failed to be
# saved. Test tuple containing list containing OrderedDict
# containing MultiDimBinning here, just to make sure MultiDimBinning
# can be written inside a nested structure.
b = binning
struct = ([OrderedDict(mdb=b)],)
jsons.to_json(struct, b_file, warn=False)
loaded = jsons.from_json(b_file)
b_ = MultiDimBinning(**loaded[0][0]['mdb'])
assert b_ == b
# Now try with pickle
b_file = os.path.join(testdir, 'multi_dim_binning.pkl')
with open(b_file, 'wb') as fobj:
pickle.dump(struct, fobj, protocol=pickle.HIGHEST_PROTOCOL)
with open(b_file, 'rb') as fobj:
loaded = pickle.load(fobj)
b_ = loaded[0][0]['mdb']
assert b_ == b
finally:
shutil.rmtree(testdir, ignore_errors=True)
# Test that reordering dimensions works correctly
e_binning = OneDimBinning(
name='true_energy', num_bins=40, is_log=True, domain=[1, 80]*ureg.GeV
)
reco_e_binning = OneDimBinning(
name='reco_energy', num_bins=40, is_log=True, domain=[1, 80]*ureg.GeV
)
cz_binning = OneDimBinning(
name='true_coszen', num_bins=40, is_lin=True, domain=[-1, 1]
)
reco_cz_binning = OneDimBinning(
name='reco_coszen', num_bins=40, is_lin=True, domain=[-1, 1]
)
az_binning = OneDimBinning(
name='true_azimuth', num_bins=10, is_lin=True,
domain=[0*ureg.rad, 2*np.pi*ureg.rad]
)
reco_az_binning = OneDimBinning(
name='true_azimuth', num_bins=10, is_lin=True,
domain=[0*ureg.rad, 2*np.pi*ureg.rad]
)
mdb_2d_orig = MultiDimBinning([e_binning, cz_binning])
orig_order = mdb_2d_orig.names
# Reverse ordering; reorder by dimension names
new_order = orig_order[::-1]
mdb_2d_new = MultiDimBinning(mdb_2d_orig)
mdb_2d_new = mdb_2d_new.reorder_dimensions(new_order)
assert mdb_2d_new.names == new_order
new_order = ['true_azimuth', 'true_energy', 'true_coszen']
mdb_2d_new = mdb_2d_new.reorder_dimensions(new_order)
assert mdb_2d_new == mdb_2d_orig
_ = MultiDimBinning([e_binning, cz_binning])
mdb_3d_orig = MultiDimBinning([e_binning, cz_binning, az_binning])
orig_order = mdb_3d_orig.names
new_order = [orig_order[2], orig_order[0], orig_order[1]]
mdb_3d_new = MultiDimBinning(mdb_3d_orig)
mdb_3d_new = mdb_3d_new.reorder_dimensions(new_order)
assert mdb_3d_new.names == new_order
# Reorder by MultiDimBinning object
mdb_3d_new = mdb_3d_new.reorder_dimensions(mdb_3d_orig)
assert mdb_3d_new.names == orig_order
# Reorder by indices
mdb_3d_new = MultiDimBinning(mdb_3d_orig)
mdb_3d_new = mdb_3d_new.reorder_dimensions([2, 0, 1])
assert mdb_3d_new.names == new_order
# Reorder by combination of index, OneDimBinning, and name
mdb_3d_new = MultiDimBinning(mdb_3d_orig)
mdb_3d_new = mdb_3d_new.reorder_dimensions(
[2, 'true_energy', mdb_2d_orig.dimensions[1]]
)
assert mdb_3d_new.names == new_order
# Reorder by superset
mdb_2d_new = MultiDimBinning(mdb_3d_orig.dimensions[0:2])
mdb_2d_new = mdb_2d_new = mdb_2d_new.reorder_dimensions(new_order)
assert mdb_2d_new.names == [o for o in new_order if o in mdb_2d_new]
# Reorder by subset
mdb_3d_new = MultiDimBinning(mdb_3d_orig)
try:
mdb_3d_new = mdb_3d_new.reorder_dimensions(new_order[0:2])
except Exception:
pass
else:
raise Exception('Should not be able to reorder by subset.')
# Create a basename-equivalent binning
mdb_3d_reco = MultiDimBinning([reco_e_binning, reco_cz_binning,
reco_az_binning])
assert mdb_3d_reco.basename_binning == mdb_3d_orig.basename_binning
t0 = time.time()
_ = [tup for tup in mdb_3d_reco.iteredgetuples()]
tprofile.info('Time to iterate over %d edge tuples: %.6f sec',
mdb_3d_reco.size, time.time() - t0)
t0 = time.time()
_ = [b for b in mdb_3d_reco.iterbins()]
tprofile.info('Time to iterate over %d bins: %.6f sec',
mdb_3d_reco.size, time.time() - t0)
# Test compatibility test
mdb1 = MultiDimBinning([
OneDimBinning(name='energy', num_bins=40, is_log=True, domain=[1, 80]*ureg.GeV),
OneDimBinning(name='coszen', num_bins=40, is_lin=True, domain=[-1, 1])
])
mdb2 = MultiDimBinning([
OneDimBinning(name='energy', num_bins=40, is_log=True, domain=[1, 80]*ureg.GeV),
OneDimBinning(name='coszen', num_bins=40, is_lin=True, domain=[-1, 1])
])
# These should be compatible
assert mdb1.is_compat(mdb2)
mdb1.assert_compat(mdb2)
assert mdb2.is_compat(mdb1)
mdb2.assert_compat(mdb1)
mdb2 = MultiDimBinning([
OneDimBinning(name='energy', num_bins=20, is_log=True, domain=[1, 80]*ureg.GeV),
OneDimBinning(name='coszen', num_bins=20, is_lin=True, domain=[-1, 1])
])
# In this direction, they should *not* be compatible
assert not mdb1.is_compat(mdb2)
with pytest.raises(AssertionError) as ae:
mdb1.assert_compat(mdb2)
# In this direction, they *should* be compatible (downsampling)
assert mdb2.is_compat(mdb1)
mdb2.assert_compat(mdb1)
logging.info('<< PASS : test_MultiDimBinning >>')
if __name__ == "__main__":
set_verbosity(1)
test_OneDimBinning()
test_MultiDimBinning()