diff --git a/gwinc/nb.py b/gwinc/nb.py index 92ade0c57efdf7da64db4a074a3ea518382bb092..f6c650ee6bf766d4d6341e08d5a3dac67ba64548 100644 --- a/gwinc/nb.py +++ b/gwinc/nb.py @@ -1,11 +1,13 @@ import os import logging +import operator import itertools import importlib import importlib.util import collections import numpy as np import scipy.interpolate +from functools import reduce def quadsum(data): @@ -56,10 +58,11 @@ class BudgetItem: """Initialize budget item. The primary argument should be the evaluation frequency array. - If it is not provided, then it is assumed to be a pre-defined - attribute of the BudgetItem class. Any keyword arguments - provided are simple written as attribute variables in the - initialized object. + If not provided, then a pre-defined `freq` attribute of the + BudgetItem class should exist. + + Additional keyword arguments are written as attribute + variables to the initialized object. """ if freq is not None: @@ -133,22 +136,24 @@ class Noise(BudgetItem): style = {} """Trace plot style dictionary""" - def calc_trace(self, calibration=None, calc=True): + def calc_trace(self, calibrations=None, calc=True): """Returns noise (PSD, style) tuple. - If `calibration` is not None it is assumed to be a - len(self.freq) array that will be multiplied to the output + If `calibrations` is not None it is assumed to be a list of + len(self.freq) arrays that will be multiplied to the output PSD. If calc=False, the noise will not be calculated and the PSD - will be None. This is useful for just getting style the - style. + will be None. This is useful for just getting the style. """ if calc: data = self.calc() - if calibration is not None: - data *= calibration + if calibrations: + data *= reduce( + operator.mul, + calibrations, + ) else: data = None return data, self.style @@ -168,11 +173,11 @@ class Noise(BudgetItem): class Budget(Noise): """GWINC Budget class - This is a Noise that represents the budget of multiple sub noises. + This is a Noise that represents a budget of multiple sub noises. - The `noises` attribute of this class should be list constituent - Noise classes. Each element can be either a single Noise class, - or a tuple of (Noise, Calibration) classes, e.g.: + The `noises` attribute of this class should list constituent Noise + classes. Each element can be either a single Noise class, or a + tuple of (Noise, Calibration) classes, e.g.: noises = [ Thermal, @@ -186,18 +191,32 @@ class Budget(Noise): the relevant calibration is evaluated and applied, and the quadrature sum of all calibrated consituent noises is returned. - Additionally a `references` attribute may be definied, similar to - the `noises` attribute described above except that the specified - noises do not contribute to the overall budget total. + Additionally, a `calibrations` attribute may define a list of + common calibrations to apply to all noises, e.g.: + + calibrations = [ + Strain, + ] + + Finally, a `references` attribute may be defined, similar to the + `noises` attribute described above except that the specified + noises do not contribute to the overall budget total, e.g.: - NOTE: an `ifo` attribute is always passed as an initialization - argument to sub noises. + references = [ + strain_data_20200120, + ] + + NOTE: if an `ifo` attribute is defined it is always passed as an + initialization argument to sub noises. """ noises = [] """List of constituent noise classes""" + calibrations = [] + """List of calibrations to be applied to all noises in budget""" + references = [] """List of reference nosie classes""" @@ -226,22 +245,20 @@ class Budget(Noise): self._noise_objs = collections.OrderedDict() # all cal objects keyed by name self._cal_objs = {} - # noise to calibration mapping - self._noise_cal = {} + # set of calibration names to apply to noise + self._noise_cals = collections.defaultdict(set) # set of all constituent budget noise names self._budget_noises = set() # initialize all noise objects for nc in self.noises: - name, noise_obj, cal = self.__init_noise(nc) - if noises and name not in noises: - continue - self.__add_noise(noise_obj, cal) + name = self.__init_noise(nc, noises) self._budget_noises.add(name) + # add common calibrations + for cal in self.calibrations: + self.__add_calibration(cal, self._budget_noises) + # add references for nc in self.references: - name, noise_obj, cal = self.__init_noise(nc) - if noises and name not in noises: - continue - self.__add_noise(noise_obj, cal) + self.__init_noise(nc, noises) # error if requested noise is not present if noises: sset = set(noises) @@ -249,29 +266,44 @@ class Budget(Noise): if not sset <= nset: raise AttributeError("unknown noise terms: {}".format(' '.join(sset-nset))) - def __init_noise(self, nc): + def __init_noise(self, nc, noise_filt): cal = None if isinstance(nc, (list, tuple)): - noise, cal = nc[:2] + noise = nc[0] + cals = nc[1] else: noise = nc + cals = [] + if noise_filt and noise not in noise_filt: + return + name = self.__add_noise(noise) + for cal in cals: + self.__add_calibration(cal, [noise]) + return name + + def __add_noise(self, noise): noise_obj = noise( *self.args, **self.kwargs ) - return noise_obj.name, noise_obj, cal - - def __add_noise(self, noise_obj, cal): - logging.debug("init {}".format(noise_obj)) - # instantiate the noise object name = noise_obj.name + logging.debug("init {}".format(noise_obj)) self._noise_objs[name] = noise_obj - if cal is not None: - # if a cal object is specified, instantiate and store - cal_obj = cal(*self.args, **self.kwargs) - if cal_obj.name not in self._cal_objs: - self._cal_objs[cal_obj.name] = cal_obj - self._noise_cal[name] = cal_obj.name + return name + + def __add_calibration(self, cal, noises): + cal_obj = cal( + *self.args, + **self.kwargs + ) + name = cal_obj.name + if name not in self._cal_objs: + logging.debug("init {}".format(cal_obj)) + self._cal_objs[name] = cal_obj + # register noises for this calibration + for noise in noises: + self._noise_cals[noise].add(name) + return name def __getitem__(self, name): try: @@ -339,13 +371,6 @@ class Budget(Noise): logging.debug("update {}".format(item)) item.update(**kwargs) - def cal_for_noise(self, name): - """Return the calibration object for named noise.""" - try: - return self._cal_objs[self._noise_cal[name]] - except KeyError: - return None - def calc_noise(self, name): """Return calibrated individual noise term. @@ -354,13 +379,12 @@ class Budget(Noise): """ noise = self._noise_objs[name] - nd = noise.calc() - cal = self.cal_for_noise(name) - if cal: - cd = cal.calc() - return nd * cd + cals = [self._cal_objs[cal].calc() for cal in self._noise_cals[name]] + data = noise.calc_trace(cals) + if isinstance(data, dict): + return data['Total'][0] else: - return nd + return data[0] def calc(self): """Calculate sum of all noises. @@ -369,7 +393,7 @@ class Budget(Noise): data = [self.calc_noise(name) for name in self._noise_objs.keys() if name in self._budget_noises] return quadsum(data) - def calc_trace(self, calibration=None, calc=True): + def calc_trace(self, calibrations=None, calc=True): """Returns a dictionary of noises traces, keyed by noise names. Values are (data, style) trace tuples (see Noise.calc_trace). @@ -377,9 +401,9 @@ class Budget(Noise): budgets are themselves dictionaries returned from calc_trace() of the sub budget. - If `calibration` is not None it is assumed to be a - len(self.freq) array that will be multiplied to the output - PSD of the budget and all sub noises. + If `calibrations` is not None it is assumed to be a list of + len(self.freq) array that will be multiplied to the output PSD + of the budget and all sub noises. If calc=False, the noise will not be calculated and the PSD will be None. This is useful for just getting style the @@ -408,18 +432,19 @@ class Budget(Noise): # if we're not calc'ing, just return the dict now if not calc: return d + + # calc all calibrations + c = {} + for name, cal in self._cal_objs.items(): + c[name] = cal.calc() # calc all noises for name, noise in self._noise_objs.items(): - # extract/calc the budget-level calibration for this noise - cal_obj = self.cal_for_noise(name) - if cal_obj: - cal = cal_obj.calc() - else: - cal = np.ones_like(self.freq) - # then multiply by the supplied calibration - if calibration is not None: - cal *= calibration - d[name] = noise.calc_trace(calibration=cal, calc=True) + cals = [c[cal] for cal in self._noise_cals[name]] + if calibrations: + cals += calibrations + d[name] = noise.calc_trace( + calibrations=cals, + ) # calc budget total constituent_data = [] for name in self._budget_noises: