Forked from
gwinc / pygwinc
261 commits behind the upstream repository.
-
Jameson Rollins authoredJameson Rollins authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
nb.py 14.06 KiB
import operator
import itertools
import collections
import numpy as np
import scipy.interpolate
from functools import reduce
from . import logger
def quadsum(data):
"""Calculate quadrature sum of list of data arrays.
Provided data are assumed to be power-referred, so this is a
simple point-by-point sum.
NaNs in sum elements do not contribute to sum.
"""
return np.nansum(data, 0)
class BudgetItem:
"""GWINC BudgetItem class
"""
def load(self):
"""Overload method for initial loading of static data.
"""
return None
def update(self, **kwargs):
"""Overload method for updating data.
By default any keyword arguments provided are written directly
as attribute variables (as with __init__).
"""
for key, val in kwargs.items():
setattr(self, key, val)
def calc(self):
"""Overload method for final PSD calculation.
Should return an array of power-referenced values evaluated at
all evaluation frequencies (self.freq).
"""
return None
##########
def __init__(self, freq=None, **kwargs):
"""Initialize budget item.
The primary argument should be the evaluation frequency array.
If not provided, then a pre-defined `freq` attribute of the
BudgetItem class should exist.
Additional keyword arguments are written as attribute
variables to the initialized object.
"""
if freq is not None:
assert isinstance(freq, np.ndarray)
self.freq = freq
elif not hasattr(self, 'freq'):
raise AttributeError("Frequency array not provided or defined.")
for key, val in kwargs.items():
setattr(self, key, val)
@property
def name(self):
""""Name of this BudgetItem class."""
return self.__class__.__name__
def __str__(self):
# FIXME: provide info on internal state (load/update/calc/etc.)
return '<{} {}>'.format(
', '.join([c.__name__ for c in self.__class__.__bases__]),
self.name,
)
def interpolate(self, freq, data):
"""Interpolate data to the evaluation frequencies.
"""
func = scipy.interpolate.interp1d(
freq, data,
kind='nearest',
copy=False,
assume_sorted=True,
bounds_error=False,
fill_value=np.nan,
)
return func(self.freq)
class Calibration(BudgetItem):
"""GWINC Calibration class
BudgetItem that represents a calibration transfer function for a
Noise. The calc() method should return a transfer function
amplitude array evaluated at the evaluation frequencies supplied
at initialization and available in the `freq` array attribute
(self.freq).
"""
def __call__(self, data):
"""Calibrate input data.
Returns calibrated version of input data array,
e.g. point-by-point product of data and calibration arrays.
"""
cal = self.calc()
assert data.shape == cal.shape, \
"data shape does not match calibration ({} != {})".format(data.shape, cal.shape)
return data * cal
class Noise(BudgetItem):
"""GWINC Noise class
BudgetItem that represents a PSD noise calculation. The calc()
method should return the noise PSD spectrum array evaluated at the
evaluation frequencies supplied at initialization and available in
the `freq` array attribute (self.freq).
"""
style = {}
"""Trace plot style dictionary"""
def calc_trace(self, calibrations=None, calc=True):
"""Returns noise (PSD, style) tuple.
If `calibrations` is not None it is assumed to be a list of
len(self.freq) arrays that will be multiplied to the output
PSD.
If calc=False, the noise will not be calculated and the PSD
will be None. This is useful for just getting the style.
"""
if calc:
data = self.calc()
if calibrations:
data *= reduce(
operator.mul,
calibrations,
)
else:
data = None
return data, self.style
def run(self, **kwargs):
"""Convenience method to load, update, and return calc traces.
Equivalent of load(), update(), calc_traces() run in sequence.
Keyword arguments are passed to update().
"""
self.load()
self.update(**kwargs)
return self.calc_trace()
class Budget(Noise):
"""GWINC Budget class
This is a Noise that represents a budget of multiple sub noises.
The `noises` attribute of this class should list constituent Noise
classes. Each element can be either a single Noise class, or a
tuple of (Noise, Calibration) classes, e.g.:
noises = [
Thermal,
(Shot, Sensing),
]
When this object is initialized, all sub noises and calibrations
are initialized. Pre-defined load() and update() methods call the
load() and update() methods of all sub noises and calibrations.
When calc() is called, the PSD is calculated for all sub noises,
the relevant calibration is evaluated and applied, and the
quadrature sum of all calibrated consituent noises is returned.
Additionally, a `calibrations` attribute may define a list of
common calibrations to apply to all noises, e.g.:
calibrations = [
Strain,
]
Finally, a `references` attribute may be defined, similar to the
`noises` attribute described above except that the specified
noises do not contribute to the overall budget total, e.g.:
references = [
strain_data_20200120,
]
NOTE: if an `ifo` attribute is defined it is always passed as an
initialization argument to sub noises.
"""
noises = []
"""List of constituent noise classes, or (noise, cal) tuples"""
calibrations = []
"""List of calibrations to be applied to all budget noises (not references)"""
references = []
"""List of reference noise classes, or (ref, cal) tuples"""
def __init__(self, *args, noises=None, **kwargs):
"""Initialize Budget object.
See BudgetItem for base initialization arguments.
If a `noises` keyword argument is provided it should be an
iterable of noise names (constituent or reference) which will
be used to filter the noises initialized in this budget.
"""
super().__init__(*args, **kwargs)
# store args and kwargs for later use
self.args = args
self.kwargs = kwargs
# FIXME: special casing the IFO here, in case it's defined as
# a class attribute rather than passed at initialization. we
# do this because we're not defining a standard way to extract
# IFO variables that get passed around in a reasonable way.
# how can we clarify this?
if 'ifo' not in kwargs and getattr(self, 'ifo', None):
self.kwargs['ifo'] = getattr(self, 'ifo', None)
# all noise objects keyed by name
self._noise_objs = collections.OrderedDict()
# all cal objects keyed by name
self._cal_objs = {}
# set of calibration names to apply to noise
self._noise_cals = collections.defaultdict(set)
# set of all constituent budget noise names
self._budget_noises = set()
# initialize all noise objects
for nc in self.noises:
name = self.__init_noise(nc, noises)
if name:
self._budget_noises.add(name)
# initialize common calibrations and add to all budget noises
for cal in self.calibrations:
self.__add_calibration(cal, self._budget_noises)
# initialize references, without common calibrations
for nc in self.references:
self.__init_noise(nc, noises)
# error if requested noise is not present
if noises:
sset = set(noises)
nset = set([name for name in self._noise_objs.keys()])
if not sset <= nset:
raise AttributeError("unknown noise terms: {}".format(' '.join(sset-nset)))
def __init_noise(self, nc, noise_filt):
cal = None
if isinstance(nc, (list, tuple)):
noise = nc[0]
cals = nc[1:]
else:
noise = nc
cals = []
noise_obj = noise(
*self.args,
**self.kwargs
)
name = noise_obj.name
if noise_filt and name not in noise_filt:
return
logger.debug("init {}".format(noise_obj))
self._noise_objs[name] = noise_obj
for cal in cals:
self.__add_calibration(cal, [name])
return name
def __add_calibration(self, cal, noises):
cal_obj = cal(
*self.args,
**self.kwargs
)
name = cal_obj.name
if name not in self._cal_objs:
logger.debug("init {}".format(cal_obj))
self._cal_objs[name] = cal_obj
# register noises for this calibration
for noise in noises:
self._noise_cals[noise].add(name)
return name
def __getitem__(self, name):
try:
return self._noise_objs[name]
except KeyError:
try:
return self._cal_objs[name]
except KeyError:
raise KeyError("unknown noise or cal name '{}".format(name))
def keys(self):
"""Iterate over budget noise names."""
return iter(self._noise_objs.keys())
def values(self):
"""Iterate over budget noise objects."""
return iter(self._noise_objs.values())
def items(self):
"""Iterate over budget noise (name, object) tuples."""
return iter(self._noise_objs.items())
def __iter__(self):
return iter(self.keys())
def walk(self):
"""Walk recursively through every BudgetItem in the budget.
This includes Noise, Calibration and Budget objects, as well
as any decendents of Budget objects.
For each leaf item yields a tuple of all ancestor objects,
e.g.:
(self)
(self, BudgetItem)
(self, ChildBudget1)
(self, ChildBudget1, BudgetItem)
...
"""
yield (self,)
for item in itertools.chain(
self._cal_objs.values(),
self._noise_objs.values()):
if isinstance(item, Budget):
for i in item.walk():
yield (self,) + i
else:
yield (self, item)
def load(self):
"""Load all noise and cal objects."""
for name, item in itertools.chain(
self._cal_objs.items(),
self._noise_objs.items()):
logger.debug("load {}".format(item))
item.load()
def update(self, **kwargs):
"""Update all noise and cal objects with supplied kwargs."""
for name, item in itertools.chain(
self._cal_objs.items(),
self._noise_objs.items()):
logger.debug("update {}".format(item))
item.update(**kwargs)
def calc_noise(self, name):
"""Return calibrated individual noise term.
The noise PSD and calibration transfer functions are
calculated, and the calibrated noise array is returned.
"""
noise = self._noise_objs[name]
cals = [self._cal_objs[cal].calc() for cal in self._noise_cals[name]]
data = noise.calc_trace(cals)
if isinstance(data, dict):
return data['Total'][0]
else:
return data[0]
def calc(self):
"""Calculate sum of all noises.
"""
data = [self.calc_noise(name) for name in self._noise_objs.keys() if name in self._budget_noises]
return quadsum(data)
def calc_trace(self, calibrations=None, calc=True):
"""Returns a dictionary of noises traces, keyed by noise names.
Values are (data, style) trace tuples (see Noise.calc_trace).
The key of the budget sum total is 'Total'. The values of sub
budgets are themselves dictionaries returned from
calc_trace() of the sub budget.
If `calibrations` is not None it is assumed to be a list of
len(self.freq) array that will be multiplied to the output PSD
of the budget and all sub noises.
If calc=False, the noise will not be calculated and the PSD
will be None. This is useful for just getting style the
style.
"""
# start by creating an empty OrderedDict used for outputing trace data
# or style info with the following order:
# references
# total
# constituents
d = collections.OrderedDict()
# allocate references
for name, noise in self._noise_objs.items():
if name in self._budget_noises:
continue
d[name] = noise.calc_trace(calc=False)
# allocate total
if self._budget_noises:
d['Total'] = None, self.style
# allocate constituent
for name, noise in self._noise_objs.items():
if name not in self._budget_noises:
continue
d[name] = noise.calc_trace(calc=False)
# if we're not calc'ing, just return the dict now
if not calc:
return d
# calc all calibrations
c = {}
for name, cal in self._cal_objs.items():
c[name] = cal.calc()
# calc all noises
for name, noise in self._noise_objs.items():
cals = [c[cal] for cal in self._noise_cals[name]]
if calibrations:
cals += calibrations
d[name] = noise.calc_trace(
calibrations=cals,
)
# calc budget total
constituent_data = []
for name in self._budget_noises:
if isinstance(d[name], dict):
data = d[name]['Total'][0]
else:
data = d[name][0]
constituent_data.append(data)
d['Total'] = quadsum(constituent_data), self.style
return d