Skip to content
Snippets Groups Projects
Commit bd6e01b7 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_hdf5_sink: revamped code to take into account new changes with...

gstlal_feature_hdf5_sink: revamped code to take into account new changes with handling/writing features to disk in main feature extraction executable, fixed import issues in streaming feature extraction processes
parent e78030b7
No related branches found
No related tags found
No related merge requests found
...@@ -57,6 +57,7 @@ def parse_command_line(): ...@@ -57,6 +57,7 @@ def parse_command_line():
parser.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1") parser.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1")
parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'") parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'")
parser.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file") parser.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file")
parser.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.")
parser.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.") parser.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.")
parser.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.") parser.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.")
parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.") parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.")
...@@ -95,48 +96,46 @@ class HDF5StreamSink(object): ...@@ -95,48 +96,46 @@ class HDF5StreamSink(object):
### initialize queues ### initialize queues
self.feature_queue = deque(maxlen = 300) self.feature_queue = deque(maxlen = 300)
### set up keys needed to do processing
self.keys = multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys()
### iDQ saving properties ### iDQ saving properties
self.rootdir = options.rootdir
self.timestamp = None
self.last_save_time = None
self.last_persist_time = None
self.sample_rate = options.sample_rate
self.write_cadence = options.write_cadence self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.tag = '%s-%s' % (options.instrument[:1], options.basename) self.tag = '%s-%s' % (options.instrument[:1], options.basename)
self.columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.feature_data = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.write_cadence, sample_rate = self.sample_rate)
# get base temp directory ### get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ: if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR'] tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else: else:
tmp_dir = os.environ['TMPDIR'] tmp_dir = os.environ['TMPDIR']
# set up keys needed to do processing def set_hdf_file_properties(self, start_time, duration):
channel_dict = multichannel_datasource.channel_dict_from_channel_file(options.channel_list) """
self.keys = {} Returns the file name, as well as locations of temporary and permanent locations of
for channel in channel_dict.keys(): directories where triggers will live, when given the current gps time and a gps duration.
f_samp = int(channel_dict[channel]['fsamp']) Also takes care of creating new directories as needed and removing any leftover temporary files.
f_high = min(2048, f_samp) """
f_low = min(32, f_high) # set/update file names and directories with new gps time and duration
n_rates = int(numpy.log2(f_high/f_low) + 1) job_id = '0001'
rates = [f_low*2**i for i in range(n_rates)] subset_id = '0001'
for rate in rates: self.feature_name = os.path.splitext(utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0]
self.keys[(channel, rate)] = None self.feature_path = utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, start_time, job_id, subset_id)
self.tmp_path = utils.to_trigger_path(self.tmp_dir, self.basename, start_time, job_id, subset_id)
# hdf saving properties
self.rootdir = options.rootdir
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
self.last_save_time = {key:None for key in self.keys}
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.feature_data = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.write_cadence)
self.feature_name = '%s-%d-5000000000' % (self.tag, int(aggregator.now()))
trigger_path = os.path.join(self.tag, self.tag+"-"+str(self.feature_name.split("-")[2])[:5], self.tag+"-0001")
self.feature_path = os.path.join(os.path.abspath(self.rootdir), trigger_path)
self.tmp_path = os.path.join(tmp_dir, trigger_path)
# create temp and output directories if they don't exist # create temp and output directories if they don't exist
aggregator.makedir(self.feature_path) aggregator.makedir(self.fpath)
aggregator.makedir(self.tmp_path) aggregator.makedir(self.tmp_path)
# delete leftover temporary files # delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp' tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp'
if os.path.isfile(tmp_file): if os.path.isfile(tmp_file):
os.remove(tmp_file) os.remove(tmp_file)
...@@ -167,31 +166,33 @@ class HDF5StreamSink(object): ...@@ -167,31 +166,33 @@ class HDF5StreamSink(object):
while self.feature_queue: while self.feature_queue:
### remove data with oldest timestamp and process ### remove data with oldest timestamp and process
timestamp, features = self.feature_queue.pop() self.timestamp, features = self.feature_queue.pop()
logger.info('processing features for timestamp %d' % timestamp) logger.info('processing features for timestamp %d' % timestamp)
for feature in features: # set save times and initialize specific saving properties if not already set
channel = feature['channel'] if self.last_save_time is None:
rate = feature['rate'] self.last_save_time = self.timestamp
key = (channel, rate) self.last_persist_time = self.timestamp
duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp
### set save times appropriately self.set_hdf_file_properties(self.timestamp, duration)
if self.last_save_time[key] is None:
self.last_save_time[key] = timestamp # Save triggers once per cadence if saving to disk
if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence):
### save new dataset to disk every save cadence self.logger.info("saving features to disk at timestamp = %d" % self.timestamp)
if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.write_cadence): self.feature_data.dump(self.tmp_path, self.feature_name, utils.floor_div(self.last_save_time, self.write_cadence), tmp = True)
logger.info('saving dataset to disk for timestamp %d' % timestamp) self.to_hdf_file()
self.feature_data.dump(self.tmp_path, self.feature_name, idq_utils.floor_div(self.last_save_time[key], self.write_cadence), key = key, tmp = True) self.last_save_time = self.timestamp
self.last_save_time[key] = timestamp
# persist triggers once per persist cadence if using hdf5 format
### create new file every persist cadence if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence):
if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.persist_cadence): self.logger.info("persisting features to disk for gps range %d - %d" % (self.timestamp, self.timestamp-self.persist_cadence))
logger.info('persisting file for range for gps range %d - %d' % (timestamp, timestamp-self.persist_cadence)) self.persist_to_disk()
self.persist_to_disk() self.finish_hdf_file()
self.last_persist_time = self.timestamp
### add new feature vector to dataset self.set_hdf_file_properties(self.timestamp, self.persist_cadence)
self.feature_data.append(feature, key = key, buftime = timestamp)
### add new feature vector to dataset
self.feature_data.append(timestamp, features)
def persist_to_disk(self): def persist_to_disk(self):
""" """
...@@ -235,7 +236,6 @@ class SignalHandler(object): ...@@ -235,7 +236,6 @@ class SignalHandler(object):
signal.signal(sig, self) signal.signal(sig, self)
def __call__(self, signum, frame): def __call__(self, signum, frame):
#print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum
self.sink.stop() self.sink.stop()
sys.exit(0) sys.exit(0)
......
...@@ -39,6 +39,8 @@ from lal import LIGOTimeGPS ...@@ -39,6 +39,8 @@ from lal import LIGOTimeGPS
from confluent_kafka import Producer, Consumer, KafkaError from confluent_kafka import Producer, Consumer, KafkaError
from gstlal.fxtools import utils
#------------------------------------------------- #-------------------------------------------------
# Functions # Functions
#------------------------------------------------- #-------------------------------------------------
......
...@@ -44,6 +44,8 @@ from gi.repository import GObject, Gst ...@@ -44,6 +44,8 @@ from gi.repository import GObject, Gst
GObject.threads_init() GObject.threads_init()
Gst.init(None) Gst.init(None)
from confluent_kafka import Producer
import lal import lal
from lal import LIGOTimeGPS from lal import LIGOTimeGPS
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment