Skip to content
Snippets Groups Projects
Commit 6fac00dc authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor: removed check in DataSourceInfo causing the pipeline...

gstlal_feature_extractor: removed check in DataSourceInfo causing the pipeline to not start up using live data, added an option for live data that allows features to be persisted to disk at a regular cadence, minor refactoring in MultiChannelHandler
parent 0dca5a19
No related branches found
No related tags found
No related merge requests found
......@@ -124,33 +124,30 @@ class MultiChannelHandler(simplehandler.Handler):
### iDQ saving properties
self.last_save_time = None
self.last_persist_time = None
self.cadence = options.cadence
self.persist_cadence = options.persist_cadence
# set whether data source is live
self.is_live = data_source_info.data_source in data_source_info.live_sources
# get base temp directory
if '_CONDOR_SCRATCH_DIR' in os.environ:
tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
self.tmp_dir = os.environ['_CONDOR_SCRATCH_DIR']
else:
tmp_dir = os.environ['TMPDIR']
self.tmp_dir = os.environ['TMPDIR']
# feature saving properties
if options.save_format == 'hdf5':
columns = ['start_time', 'stop_time', 'trigger_time', 'frequency', 'q', 'snr', 'phase', 'sigmasq', 'chisq']
self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
duration = int(options.gps_end_time) - int(options.gps_start_time)
self.fname = os.path.splitext(idq_utils.to_trigger_filename(self.basename, options.gps_start_time, duration, 'h5'))[0]
self.fpath = idq_utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, options.gps_start_time, self.job_id, self.subset_id)
self.tmp_path = idq_utils.to_trigger_path(tmp_dir, self.basename, options.gps_start_time, self.job_id, self.subset_id)
# create temp and output directories if they don't exist
aggregator.makedir(self.fpath)
aggregator.makedir(self.tmp_path)
if self.is_live:
duration = idq_utils.floor_div(int(options.gps_start_time) + self.persist_cadence, self.persist_cadence) - int(options.gps_start_time)
else:
duration = int(options.gps_end_time) - int(options.gps_start_time)
# delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp'
if os.path.isfile(tmp_file):
os.remove(tmp_file)
self.set_hdf_file_properties(int(options.gps_start_time), duration)
elif options.save_format == 'ascii':
# create header for trigger file
......@@ -231,6 +228,7 @@ class MultiChannelHandler(simplehandler.Handler):
# set save time appropriately
if self.last_save_time is None:
self.last_save_time = buftime
self.last_persist_time = buftime
# Save triggers once per cadence
if idq_utils.in_new_epoch(buftime, self.last_save_time, self.cadence) or (options.trigger_end_time and buftime == int(options.trigger_end_time)):
......@@ -243,6 +241,13 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata.append(self.header)
self.last_save_time = buftime
# persist triggers once per persist cadence if using hdf5 format and running with live data
if self.is_live and idq_utils.in_new_epoch(buftime, self.last_persist_time, self.persist_cadence) and options.save_format == 'hdf5':
logger.info("persisting features to disk at timestamp = %d" % buftime)
self.finish_hdf_file()
self.last_persist_time = buftime
self.set_hdf_file_properties(buftime, self.persist_cadence)
# read buffer contents
for i in range(buf.n_memory()):
memory = buf.peek_memory(i)
......@@ -338,6 +343,26 @@ class MultiChannelHandler(simplehandler.Handler):
tmp_path = os.path.join(self.tmp_path, self.fname)+".h5.tmp"
shutil.move(tmp_path, final_path)
def set_hdf_file_properties(self, start_time, duration):
"""
Returns the file name, as well as locations of temporary and permanent locations of
directories where triggers will live, when given the current gps time and a gps duration.
Also takes care of creating new directories as needed and removing any leftover temporary files.
"""
# set/update file names and directories with new gps time and duration
self.fname = os.path.splitext(idq_utils.to_trigger_filename(self.basename, start_time, duration, 'h5'))[0]
self.fpath = idq_utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, start_time, self.job_id, self.subset_id)
self.tmp_path = idq_utils.to_trigger_path(self.tmp_dir, self.basename, start_time, self.job_id, self.subset_id)
# create temp and output directories if they don't exist
aggregator.makedir(self.fpath)
aggregator.makedir(self.tmp_path)
# delete leftover temporary files
tmp_file = os.path.join(self.tmp_path, self.fname)+'.h5.tmp'
if os.path.isfile(tmp_file):
os.remove(tmp_file)
def gen_psd_xmldoc(self):
xmldoc = lal.series.make_psd_xmldoc(self.psds)
process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_idq", {})
......@@ -482,6 +507,7 @@ def parse_command_line():
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("--persist-cadence", type = "int", default = 320, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Only used for live data, and needs to be a multiple of save cadence. Default = 320 seconds.")
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--save-format", action = "store_true", default = "hdf5", help = "Specifies the save format (ascii or hdf5) of features written to disk. Default = hdf5")
......@@ -509,10 +535,14 @@ def parse_command_line():
options.trigger_start_time = options.gps_start_time
if options.trigger_end_time is None:
options.trigger_end_time = options.gps_end_time
elif options.data_source in ("framexmit", "lvshm"):
else:
options.gps_start_time = int(aggregator.now())
options.gps_end_time = 9999999999
# NOTE: set the gps end time to be "infinite"
options.gps_end_time = 2000000000
# check if persist and save cadence times are sensible
assert options.persist_cadence >= options.cadence
assert (options.persist_cadence % options.cadence) == 0
return options, filenames
......@@ -609,7 +639,7 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
logger.info("checkpoint: {0} of {1} files completed and continuing with channel subset {2}".format((subset_id - 1), len(data_source_info.channel_subsets), subset_id))
pass
logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
logger.info("processing channel subset %d of %d" % (subset_id, len(data_source_info.channel_subsets)))
#
# if web services serving up bottle routes are enabled,
......
......@@ -348,8 +348,6 @@ class DataSourceInfo(object):
if options.gps_start_time is not None:
if options.gps_end_time is None:
raise ValueError("must provide both --gps-start-time and --gps-end-time")
if options.data_source in self.live_sources:
raise ValueError("cannot set --gps-start-time or --gps-end-time with %s" % " or ".join("--data-source=%s" % src for src in sorted(self.live_sources)))
try:
start = LIGOTimeGPS(options.gps_start_time)
except ValueError:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment