Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
utils.py 14.15 KiB
#!/usr/bin/env python
#
# Copyright (C) 2017-2018  Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.



####################
# 
#     preamble
#
#################### 


from collections import Counter, defaultdict, deque
import glob
import itertools
import logging
import operator
import os
import sys
import timeit

import h5py
import numpy

from lal import gpstime
from lal.utils import CacheEntry

from gstlal import aggregator


####################
# 
#    functions
#
####################

#----------------------------------
### hdf5 utilities

def get_dataset(path, base, name = 'data', group = None):
	"""
	open a dataset at @param path with name @param base and return the data
	"""
	fname = os.path.join(path, "%s.h5" % base)
	try:
		with h5py.File(fname, 'r') as hfile:
			if group:
				data = numpy.array(hfile[group][name])
			else:
				data = numpy.array(hfile[name])
		return fname, data
	except IOError:
		return fname, []

def create_new_dataset(path, base, data, name = 'data', group = None, tmp = False, metadata = None):
	"""
	A function to create a new dataset with data @param data.
	The data will be stored in an hdf5 file at path @param path with
	base name @param base.  You can also make a temporary file.
	If specified, will also save metadata given as key value pairs.

	Returns the filename where the dataset was created.
	"""
	if tmp:
		fname = os.path.join(path, "%s.h5.tmp" % base)
	else:
		fname = os.path.join(path, "%s.h5" % base)

	# create dir if it doesn't exist
	if not os.path.exists(path):
		aggregator.makedir(path)

	# save data to hdf5
	with h5py.File(fname, 'a') as hfile:

		# set global metadata if specified
		if metadata and not hfile.attrs:
			for key, value in metadata.items():
				hfile.attrs.create(key, value)

		# create dataset
		if group:
			if group not in hfile:
				hfile.create_group(group)
			hfile[group].create_dataset(name, data=data)
		else:
			hfile.create_dataset(name, data=data)

	return fname

#----------------------------------
### gps time utilities

def in_new_epoch(new_gps_time, prev_gps_time, gps_epoch):
	"""
	Returns whether new and old gps times are in different
	epochs.

	>>> in_new_epoch(1234561200, 1234560000, 1000)
	True
	>>> in_new_epoch(1234561200, 1234560000, 10000)
	False

	"""
	return (new_gps_time - floor_div(prev_gps_time, gps_epoch)) >= gps_epoch

def floor_div(x, n):
	"""
	Floor an integer by removing its remainder
	from the nearest value n.

	>>> floor_div(163, 10)
	160
	>>> floor_div(158, 10)
	150

	"""
	assert n > 0
	return (x // n) * n

def gps2latency(gps_time):
	"""
	Given a gps time, measures the latency to ms precision relative to now.
	"""
	current_gps_time = float(gpstime.tconvert('now')) + (timeit.default_timer() % 1)
	return round(current_gps_time - gps_time, 3)

#----------------------------------
### pathname utilities

def to_trigger_path(rootdir, basename, start_time, job_id=None, subset_id=None):
	"""
	Given a basepath, instrument, description, start_time, will return a
	path pointing to a directory structure in the form::

		${rootdir}/${basename}/${basename}-${start_time_mod1e5}/

    and if the optional job_id, subset_id kwargs are given, the path will be of the form::

		${rootdir}/${basename}/${basename}-${start_time_mod1e5}/${basename}-${job_id}-${subset_id}/
	"""
	start_time_mod1e5 = str(start_time).zfill(10)[:5]
	if job_id and subset_id:
		trigger_path = os.path.join(rootdir, basename, '-'.join([basename, start_time_mod1e5]), '-'.join([basename, job_id, subset_id]))
	else:
		trigger_path = os.path.join(rootdir, basename, '-'.join([basename, start_time_mod1e5]))
	return trigger_path

def to_trigger_filename(basename, start_time, duration, suffix, tmp=False):
	"""
	Given an instrument, description, start_time, and duration, will return a
	filename suitable with the T050017 file naming convention, in the form::

		${basename}-${start_time}-{duration}.${suffix}

	or if a temporary file is requested::

		${basename}-${start_time}-{duration}.${suffix}.tmp
	"""
	if tmp:
		return '%s-%d-%d.%s.tmp' % (basename, start_time, duration, suffix)
	else:
		return '%s-%d-%d.%s' % (basename, start_time, duration, suffix)

def latency_name(stage_name, stage_num, channel, rate=None):
	"""
	Returns a properly formatted latency element name based on stage,
	channel, and rate information.
	"""
	if rate:
		return 'stage%d_%s_%s_%s' % (stage_num, stage_name, str(rate).zfill(5), channel)
	else:
		return 'stage%d_%s_%s' % (stage_num, stage_name, channel)

#----------------------------------
### logging utilities

def get_logger(logname, log_level=10, rootdir='.', verbose=False):
    '''
    standardize how we instantiate loggers
    '''
    logger = logging.getLogger(logname)
    logger.setLevel(log_level)

    # set up FileHandler for output file
    log_path = os.path.join(rootdir, logname+'.log')
    handlers = [logging.FileHandler(log_path)]

    # set up handler for stdout
    if verbose:
        handlers.append( logging.StreamHandler() )

    # add handlers to logger
    formatter = logging.Formatter('%(asctime)s | %(name)s : %(levelname)s : %(message)s')
    for handler in handlers:
        handler.setFormatter( formatter )
        logger.addHandler( handler )

    return logger

#----------------------------------
### cache utilities

def path2cache(rootdir, pathname):
	"""
	given a rootdir and a glob-compatible pathname that may contain shell-style wildcards,
	will find all files that match and populate a Cache.
	NOTE: this will only work with files that comply with the T050017 file convention.
	"""
	return [CacheEntry.from_T050017(file_) for file_ in glob.iglob(os.path.join(rootdir, pathname))]

#----------------------------------
### other utilities

def group_indices(indices):
	"""
	Given a list of indices, groups up indices into contiguous groups.
	"""
	for k, group in itertools.groupby(enumerate(indices), lambda (i,x):i-x):
		yield map(operator.itemgetter(1), group)


####################
#
#     classes
#
####################

#----------------------------------
### Feature I/O structures

class FeatureData(object):
	"""
	Base class for saving feature data.
	Extend for a specific file-based implementation.
	"""
	def __init__(self, columns, keys = None, **kwargs):
		self.keys = keys
		self.columns = columns
		self.feature_data = {}

	def dump(self, path):
		raise NotImplementedError

	def append(self, key, value):
		raise NotImplementedError

	def clear(self):
		raise NotImplementedError

class HDF5TimeseriesFeatureData(FeatureData):
	"""
	Saves feature data to hdf5 as regularly sampled timeseries.
	"""
	def __init__(self, columns, keys, **kwargs):
		super(HDF5TimeseriesFeatureData, self).__init__(columns, keys = keys, **kwargs)
		self.cadence = kwargs['cadence']
		self.sample_rate = kwargs['sample_rate']
		self.waveform = kwargs['waveform']
		self.metadata = dict(**kwargs)
		self.dtype = [(column, numpy.float32) for column in self.columns]
		self.feature_data = {key: numpy.empty((self.cadence * self.sample_rate,), dtype = self.dtype) for key in keys}
		self.last_save_time = 0
		self.clear()

	def dump(self, path, base, start_time, tmp = False):
		"""
		Saves the current cadence of features to disk and clear out data
		"""
		for key in self.keys:
			nonnan_indices = list(numpy.where(numpy.isfinite(self.feature_data[key]['time']))[0])

			### split up and save datasets into contiguous segments
			for idx_group in group_indices(nonnan_indices):
				start_idx, end_idx = idx_group[0], idx_group[-1]
				start = start_time + float(start_idx) / self.sample_rate
				end = start_time + float(end_idx + 1) / self.sample_rate
				name = "%.6f_%.6f" % (float(start), float(end - start))
				create_new_dataset(path, base, self.feature_data[key][start_idx:end_idx], name=name, group=key, tmp=tmp, metadata=self.metadata)

		### clear out current features
		self.clear()

	def append(self, timestamp, features):
		"""
		Append a feature buffer to data structure
		"""
		self.last_save_time = floor_div(timestamp, self.cadence)
		time_idx = (timestamp - self.last_save_time) * self.sample_rate

		for key in features.keys():
			for row_idx, row in enumerate(features[key]):
				if row:
					idx = time_idx + row_idx
					self.feature_data[key][idx] = numpy.array(tuple(row[col] for col in self.columns), dtype=self.dtype)

	def clear(self):
		for key in self.keys:
			self.feature_data[key][:] = numpy.nan

class HDF5ETGFeatureData(FeatureData):
	"""!
	Saves feature data with varying dataset lengths (when run in ETG mode) to hdf5.
	"""
	def __init__(self, columns, keys, **kwargs):
		super(HDF5ETGFeatureData, self).__init__(columns, keys = keys, **kwargs)
		self.cadence = kwargs['cadence']
		self.waveform = kwargs['waveform']
		self.metadata = dict(**kwargs)
		self.dtype = [(column, numpy.float32) for column in self.columns]
		self.feature_data = {key: [] for key in keys}
		self.clear()

	def dump(self, path, base, start_time, tmp = False):
		"""
		Saves the current cadence of gps triggers to disk and clear out data
		"""
		name = "%d_%d" % (start_time, self.cadence)
		for key in self.keys:
			create_new_dataset(path, base, numpy.array(self.feature_data[key], dtype=self.dtype), name=name, group=key, tmp=tmp, metadata=self.metadata)
		self.clear()

	def append(self, timestamp, features):
		"""
		Append a trigger row to data structure

		NOTE: timestamp arg is here purely to match API, not used in append
		"""
		for key in features.keys():
			for row in features[key]:
				self.feature_data[key].append(tuple(row[col] for col in self.columns))

	def clear(self):
		for key in self.keys:
			self.feature_data[key] = []

class TimeseriesFeatureQueue(object):
	"""
	Class for storing regularly sampled feature data.
	NOTE: assumes that ingested features are time ordered.

	Example:
		>>> # create the queue
		>>> columns = ['time', 'snr']
		>>> channels = ['channel1']
		>>> queue = TimeseriesFeatureQueue(channels, columns, sample_rate=1, buffer_size=1)
		>>> # add features
		>>> queue.append(123450, 'channel1', {'time': 123450.3, 'snr': 3.0})
		>>> queue.append(123451, 'channel1', {'time': 123451.7, 'snr': 6.5})
		>>> queue.append(123452, 'channel1', {'time': 123452.4, 'snr': 5.2})
		>>> # get oldest feature
		>>> row = queue.pop()
		>>> row['timestamp']
		123450
		>>> row['features']['channel1']
		[{'snr': 3.0, 'time': 123450.3}]

	"""
	def __init__(self, channels, columns, **kwargs):
		self.channels = channels
		self.columns = columns
		self.sample_rate = kwargs.pop('sample_rate')
		self.buffer_size = kwargs.pop('buffer_size')
		self.out_queue = deque(maxlen = 5)
		self.in_queue = {}
		self.counter = Counter()
		self.last_timestamp = 0
		self.effective_latency = 2 # NOTE: set so that late features are not dropped

	def append(self, timestamp, channel, row):
		if timestamp > self.last_timestamp:
			### create new buffer if one isn't available for new timestamp
			if timestamp not in self.in_queue:
				self.in_queue[timestamp] = self._create_buffer()
			self.counter[timestamp] += 1

			### store row, aggregating if necessary
			idx = self._idx(row['time'])
			if not self.in_queue[timestamp][channel][idx] or (row['snr'] > self.in_queue[timestamp][channel][idx]['snr']):
				self.in_queue[timestamp][channel][idx] = row

			### check if there's enough new samples that the oldest sample needs to be pushed
			if len(self.counter) > self.effective_latency:
				oldest_timestamp = min(self.counter.keys())
				self.last_timestamp = oldest_timestamp
				self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
				del self.counter[oldest_timestamp]

	def pop(self):
		if len(self):
			return self.out_queue.popleft()

	def flush(self):
		while self.in_queue:
			oldest_timestamp = min(self.counter.keys())
			del self.counter[oldest_timestamp]
			self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})

	def _create_buffer(self):
		return defaultdict(lambda: [None for ii in range(int(self.sample_rate * self.buffer_size))])

	def _idx(self, timestamp):
		return int(numpy.floor(((timestamp / self.buffer_size) % 1) * self.buffer_size *self.sample_rate))

	def __len__(self):
		return len(self.out_queue)

class ETGFeatureQueue(object):
	"""
	Class for storing feature data when pipeline is running in ETG mode, i.e. report all triggers above an SNR threshold.
	NOTE: assumes that ingested features are time ordered.
	"""
	def __init__(self, channels, columns, **kwargs):
		self.channels = channels
		self.columns = columns
		self.out_queue = deque(maxlen = 5)
		self.in_queue = {}
		self.counter = Counter()
		self.last_timestamp = 0
		self.effective_latency = 2

	def append(self, timestamp, channel, row):
		if timestamp > self.last_timestamp:
			### create new buffer if one isn't available for new timestamp
			if timestamp not in self.in_queue:
				self.in_queue[timestamp] = self._create_buffer()
			self.counter[timestamp] += 1

			### store row
			self.in_queue[timestamp][channel].append(row)

			### check if there's enough new samples that the oldest sample needs to be pushed
			if len(self.counter) > self.effective_latency:
				oldest_timestamp = min(self.counter.keys())
				self.last_timestamp = oldest_timestamp
				self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})
				del self.counter[oldest_timestamp]

	def pop(self):
		if len(self):
			return self.out_queue.popleft()

	def flush(self):
		while self.in_queue:
			oldest_timestamp = min(self.counter.keys())
			del self.counter[oldest_timestamp]
			self.out_queue.append({'timestamp': oldest_timestamp, 'features': self.in_queue.pop(oldest_timestamp)})

	def _create_buffer(self):
		return defaultdict(list)

	def __len__(self):
		return len(self.out_queue)