diff --git a/gstlal-burst/bin/gstlal_feature_hdf5_sink b/gstlal-burst/bin/gstlal_feature_hdf5_sink index fe5b1ced45a2b4f429cf8bdcca26c9c69abcb32e..d15f910ca18549fba9b8b1053cf25f851b525150 100755 --- a/gstlal-burst/bin/gstlal_feature_hdf5_sink +++ b/gstlal-burst/bin/gstlal_feature_hdf5_sink @@ -57,6 +57,7 @@ def parse_command_line(): parser.add_option("--instrument", metavar = "string", default = "H1", help = "Sets the instrument for files written to disk. Default = H1") parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'") parser.add_option("--channel-list", type="string", metavar = "name", help = "Set the list of the channels to process. Command given as --channel-list=location/to/file") + parser.add_option("--sample-rate", type = "int", metavar = "Hz", default = 1, help = "Set the sample rate for feature timeseries output, must be a power of 2. Default = 1 Hz.") parser.add_option("--write-cadence", type = "int", default = 100, help = "Rate at which the feature data is written to disk. Default = 100 seconds.") parser.add_option("--persist-cadence", type = "int", default = 10000, help = "Rate at which new hdf5 files are written to disk. Default = 10000 seconds.") parser.add_option("--processing-cadence", type = "float", default = 0.1, help = "Rate at which the synchronizer acquires and processes data. Default = 0.1 seconds.") @@ -95,48 +96,46 @@ class HDF5StreamSink(object): ### initialize queues self.feature_queue = deque(maxlen = 300) + ### set up keys needed to do processing + self.keys = multichannel_datasource.channel_dict_from_channel_file(options.channel_list).keys() + ### iDQ saving properties + self.rootdir = options.rootdir + self.timestamp = None + self.last_save_time = None + self.last_persist_time = None + self.sample_rate = options.sample_rate self.write_cadence = options.write_cadence + self.persist_cadence = options.persist_cadence self.tag = '%s-%s' % (options.instrument[:1], options.basename) + self.columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq'] + self.feature_data = utils.HDF5TimeseriesFeatureData(self.columns, keys = self.keys, cadence = self.write_cadence, sample_rate = self.sample_rate) - # get base temp directory + ### get base temp directory if '_CONDOR_SCRATCH_DIR' in os.environ: tmp_dir = os.environ['_CONDOR_SCRATCH_DIR'] else: tmp_dir = os.environ['TMPDIR'] - # set up keys needed to do processing - channel_dict = multichannel_datasource.channel_dict_from_channel_file(options.channel_list) - self.keys = {} - for channel in channel_dict.keys(): - f_samp = int(channel_dict[channel]['fsamp']) - f_high = min(2048, f_samp) - f_low = min(32, f_high) - n_rates = int(numpy.log2(f_high/f_low) + 1) - rates = [f_low*2**i for i in range(n_rates)] - for rate in rates: - self.keys[(channel, rate)] = None - - # hdf saving properties - self.rootdir = options.rootdir - self.write_cadence = options.write_cadence - self.persist_cadence = options.persist_cadence - self.last_save_time = {key:None for key in self.keys} - columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq'] - self.feature_data = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.write_cadence) - - self.feature_name = '%s-%d-5000000000' % (self.tag, int(aggregator.now())) - - trigger_path = os.path.join(self.tag, self.tag+"-"+str(self.feature_name.split("-")[2])[:5], self.tag+"-0001") - self.feature_path = os.path.join(os.path.abspath(self.rootdir), trigger_path) - self.tmp_path = os.path.join(tmp_dir, trigger_path) + def set_hdf_file_properties(self, start_time, duration): + """ + Returns the file name, as well as locations of temporary and permanent locations of + directories where triggers will live, when given the current gps time and a gps duration. + Also takes care of creating new directories as needed and removing any leftover temporary files. + """ + # set/update file names and directories with new gps time and duration + job_id = '0001' + subset_id = '0001' + self.feature_name = os.path.splitext(utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0] + self.feature_path = utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, start_time, job_id, subset_id) + self.tmp_path = utils.to_trigger_path(self.tmp_dir, self.basename, start_time, job_id, subset_id) # create temp and output directories if they don't exist - aggregator.makedir(self.feature_path) + aggregator.makedir(self.fpath) aggregator.makedir(self.tmp_path) # delete leftover temporary files - tmp_file = os.path.join(self.tmp_path, self.feature_name)+'.h5.tmp' + tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp' if os.path.isfile(tmp_file): os.remove(tmp_file) @@ -167,31 +166,33 @@ class HDF5StreamSink(object): while self.feature_queue: ### remove data with oldest timestamp and process - timestamp, features = self.feature_queue.pop() + self.timestamp, features = self.feature_queue.pop() logger.info('processing features for timestamp %d' % timestamp) - for feature in features: - channel = feature['channel'] - rate = feature['rate'] - key = (channel, rate) - - ### set save times appropriately - if self.last_save_time[key] is None: - self.last_save_time[key] = timestamp - - ### save new dataset to disk every save cadence - if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.write_cadence): - logger.info('saving dataset to disk for timestamp %d' % timestamp) - self.feature_data.dump(self.tmp_path, self.feature_name, idq_utils.floor_div(self.last_save_time[key], self.write_cadence), key = key, tmp = True) - self.last_save_time[key] = timestamp - - ### create new file every persist cadence - if idq_utils.in_new_epoch(timestamp, self.last_save_time[key], self.persist_cadence): - logger.info('persisting file for range for gps range %d - %d' % (timestamp, timestamp-self.persist_cadence)) - self.persist_to_disk() - - ### add new feature vector to dataset - self.feature_data.append(feature, key = key, buftime = timestamp) + # set save times and initialize specific saving properties if not already set + if self.last_save_time is None: + self.last_save_time = self.timestamp + self.last_persist_time = self.timestamp + duration = utils.floor_div(self.timestamp + self.persist_cadence, self.persist_cadence) - self.timestamp + self.set_hdf_file_properties(self.timestamp, duration) + + # Save triggers once per cadence if saving to disk + if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_save_time, self.write_cadence): + self.logger.info("saving features to disk at timestamp = %d" % self.timestamp) + self.feature_data.dump(self.tmp_path, self.feature_name, utils.floor_div(self.last_save_time, self.write_cadence), tmp = True) + self.to_hdf_file() + self.last_save_time = self.timestamp + + # persist triggers once per persist cadence if using hdf5 format + if self.timestamp and utils.in_new_epoch(self.timestamp, self.last_persist_time, self.persist_cadence): + self.logger.info("persisting features to disk for gps range %d - %d" % (self.timestamp, self.timestamp-self.persist_cadence)) + self.persist_to_disk() + self.finish_hdf_file() + self.last_persist_time = self.timestamp + self.set_hdf_file_properties(self.timestamp, self.persist_cadence) + + ### add new feature vector to dataset + self.feature_data.append(timestamp, features) def persist_to_disk(self): """ @@ -235,7 +236,6 @@ class SignalHandler(object): signal.signal(sig, self) def __call__(self, signum, frame): - #print >>sys.stderr, "SIG %d received, attempting graceful shutdown..." % signum self.sink.stop() sys.exit(0) diff --git a/gstlal-burst/bin/gstlal_feature_synchronizer b/gstlal-burst/bin/gstlal_feature_synchronizer index e8360f14e24324987ab4f8ff0fdb55fab5814d37..13125e1de7ca9be9d2ac71d19eb0db632c76f77e 100755 --- a/gstlal-burst/bin/gstlal_feature_synchronizer +++ b/gstlal-burst/bin/gstlal_feature_synchronizer @@ -39,6 +39,8 @@ from lal import LIGOTimeGPS from confluent_kafka import Producer, Consumer, KafkaError +from gstlal.fxtools import utils + #------------------------------------------------- # Functions #------------------------------------------------- diff --git a/gstlal-burst/python/fxtools/feature_extractor.py b/gstlal-burst/python/fxtools/feature_extractor.py index fec979f207c97487bc0016175ce1696b6b9bcab2..4293d773f08cd881f13b0fcc1e9ea51e1dd6ba33 100644 --- a/gstlal-burst/python/fxtools/feature_extractor.py +++ b/gstlal-burst/python/fxtools/feature_extractor.py @@ -44,6 +44,8 @@ from gi.repository import GObject, Gst GObject.threads_init() Gst.init(None) +from confluent_kafka import Producer + import lal from lal import LIGOTimeGPS