Skip to content
Snippets Groups Projects
Commit d26d56c9 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor_pipe: clean up and refactor code for maintainability....

gstlal_feature_extractor_pipe: clean up and refactor code for maintainability. feature_extractor.py: change default save_cadence from 200s -> 2000s to save on I/O downstream
parent 555c078a
No related branches found
No related tags found
No related merge requests found
...@@ -17,95 +17,60 @@ ...@@ -17,95 +17,60 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
""" """
This program makes a dag to run gstlal_feature_extractor offline This program makes a dag to run offline gstlal_feature_extractor batch jobs
""" """
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>' __author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
############################################################################## # =============================
# import standard modules and append the lalapps prefix to the python path #
import sys, os, stat # preamble
import itertools #
import numpy # =============================
import math
from optparse import OptionParser import os
import optparse
##############################################################################
# import the modules we need to build the pipeline
import lal import lal
import lal.series
from lal.utils import CacheEntry
from glue import pipeline
from glue.lal import Cache
from ligo import segments from ligo import segments
from glue.ligolw import ligolw
from glue.ligolw import lsctables from gstlal import aggregator
import glue.ligolw.utils as ligolw_utils from gstlal import inspiral_pipe
import glue.ligolw.utils.segments as ligolw_segments
from gstlal import inspiral, inspiral_pipe
from gstlal import dagparts as gstlaldagparts from gstlal import dagparts as gstlaldagparts
from gstlal import datasource
from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils from gstlal.fxtools import utils
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
# =============================
# #
# get a dictionary of all the segments # functions
# #
# =============================
def breakupseg(seg, maxextent, overlap):
if maxextent <= 0:
raise ValueError, "maxextent must be positive, not %s" % repr(maxextent)
# Simple case of only one segment
if abs(seg) < maxextent:
return segments.segmentlist([seg])
seglist = segments.segmentlist()
end = seg[1]
while abs(seg):
if (seg[0] + maxextent + overlap) < end:
# Round down segment gps end time to integer multiple of cadence.
seglist.append(segments.segment(seg[0], utils.floor_div(int(seg[0]) + maxextent + overlap, options.cadence)))
seg = segments.segment(seglist[-1][1] - overlap, seg[1])
else:
seglist.append(segments.segment(seg[0], end))
break
return seglist
def breakupsegs(seglist, maxextent, overlap):
newseglist = segments.segmentlist()
for bigseg in seglist:
newseglist.extend(breakupseg(bigseg, maxextent, overlap))
return newseglist
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30): def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
"""
get a dictionary of all the analysis segments
"""
segsdict = segments.segmentlistdict() segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length
# start pad to allow whitener to settle + the maximum template_length
start_pad = multirate_datasource.PSD_DROP_TIME + max_template_length start_pad = multirate_datasource.PSD_DROP_TIME + max_template_length
segsdict[ifo] = segments.segmentlist([boundary_seg]) segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad) segsdict[ifo] = segsdict[ifo].protract(start_pad)
# FIXME revert to gstlaldagparts.breakupsegs and remove above two functions when we no longer write to ascii. segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
segsdict[ifo] = breakupsegs(segsdict[ifo], segment_length, start_pad)
#segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
if not segsdict[ifo]: if not segsdict[ifo]:
del segsdict[ifo] del segsdict[ifo]
return segsdict return segsdict
# def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30):
# get a dictionary of all the channels per gstlal_feature_extractor job """
# get a dictionary of all the channels per gstlal_feature_extractor job
"""
def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30):
feature_extractor_nodes = {} feature_extractor_nodes = {}
# parallelize jobs by channel subsets # parallelize jobs by channel subsets
...@@ -136,7 +101,7 @@ def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, seg ...@@ -136,7 +101,7 @@ def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, seg
trig_start = int(seg[0]) + multirate_datasource.PSD_DROP_TIME + max_template_length trig_start = int(seg[0]) + multirate_datasource.PSD_DROP_TIME + max_template_length
feature_extractor_nodes[(ii, seg)] = \ feature_extractor_nodes[(ii, seg)] = \
inspiral_pipe.generic_node(gstlalFeatureExtractorJob, dag, parent_nodes = dep_nodes, inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = dep_nodes,
opts = {"gps-start-time":int(seg[0]), opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]), "gps-end-time":int(seg[1]),
"feature-start-time":int(trig_start), "feature-start-time":int(trig_start),
...@@ -165,33 +130,24 @@ def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, seg ...@@ -165,33 +130,24 @@ def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, seg
return feature_extractor_nodes return feature_extractor_nodes
# =============================
# #
# Main # command line parser
# #
# =============================
def parse_command_line(): def parse_command_line():
parser = OptionParser(usage = '%prog [options]', description = __doc__) parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# generic data source options # generic data source options
multichannel_datasource.append_options(parser) multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# trigger generation options # DAG architecture options
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data and stores to local filespace.")
parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--save-format", action = "store_true", default = "hdf5", help = "Specifies the save format (ascii or hdf5) of features written to disk. Default = hdf5")
parser.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.")
parser.add_option("--persist-cadence", type = "int", default = 200, help = "Rate at which to persist trigger files to disk. Default = 200 seconds.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.05, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.05.")
parser.add_option("-q", "--qhigh", type = "float", default = 100, help = "Q high value for half sine-gaussian waveforms. Default = 100.")
parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.") parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.")
parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.") parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.")
parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks") parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks")
parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.") parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.")
parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
parser.add_option("--waveform", metavar = "string", default = "half_sine_gaussian", help = "Specifies the waveform used for matched filtering. Possible options: (half_sine_gaussian, sine_gaussian). Default = half_sine_gaussian")
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
# Condor commands # Condor commands
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2") parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2")
...@@ -211,59 +167,54 @@ def parse_command_line(): ...@@ -211,59 +167,54 @@ def parse_command_line():
min_segment_length = int(4 * multirate_datasource.PSD_DROP_TIME) min_segment_length = int(4 * multirate_datasource.PSD_DROP_TIME)
assert options.segment_length >= min_segment_length assert options.segment_length >= min_segment_length
return options, filenames return options, filenames
# =============================
# #
# Useful variables # main
# #
# =============================
options, filenames = parse_command_line()
output_dir = "plots"
listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
if not os.path.exists(listdir):
os.makedirs(listdir)
# #
# parsing and setting up core structures
# #
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options) data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys() channels = data_source_info.channel_dict.keys()
boundary_seg = data_source_info.seg
# FIXME Work out better way to determine max template length # FIXME Work out better way to determine max template length
max_template_length = 30 max_template_length = 30
# #
# Setup the dag # create directories if needed
# #
try: listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
os.mkdir("logs") aggregator.makedir(listdir)
except: aggregator.makedir("logs")
pass
dag = inspiral_pipe.DAG("feature_extractor_pipe")
# #
# setup the job classes # set up dag and job classes
# #
gstlalFeatureExtractorJob = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "request_disk":options.request_disk, "want_graceful_removal":"True", "kill_sig":"15"})) dag = inspiral_pipe.DAG("feature_extractor_pipe")
segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, options.segment_length, max_template_length=max_template_length) condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length)
# #
# feature extractor jobs # set up jobs
# #
feature_extractor_nodes = feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, [], segsdict, ifo, options, data_source_info) feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], segsdict, ifo, options, data_source_info, max_template_length=max_template_length)
# #
# all done # write out dag and sub files
# #
dag.write_sub_files() dag.write_sub_files()
......
...@@ -542,7 +542,7 @@ def append_options(parser): ...@@ -542,7 +542,7 @@ def append_options(parser):
group.add_option("--feature-mode", metavar = "string", default = "timeseries", help = "Specifies the mode for which features are generated (timeseries/etg). Default = timeseries") group.add_option("--feature-mode", metavar = "string", default = "timeseries", help = "Specifies the mode for which features are generated (timeseries/etg). Default = timeseries")
group.add_option("--data-transfer", metavar = "string", default = "table", help = "Specifies the format of features transferred over-the-wire (table/row). Default = table") group.add_option("--data-transfer", metavar = "string", default = "table", help = "Specifies the format of features transferred over-the-wire (table/row). Default = table")
group.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.") group.add_option("--cadence", type = "int", default = 20, help = "Rate at which to write trigger files to disk. Default = 20 seconds.")
group.add_option("--persist-cadence", type = "int", default = 200, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Needs to be a multiple of save cadence. Default = 200 seconds.") group.add_option("--persist-cadence", type = "int", default = 2000, help = "Rate at which to persist trigger files to disk, used with hdf5 files. Needs to be a multiple of save cadence. Default = 2000 seconds.")
parser.add_option_group(group) parser.add_option_group(group)
group = optparse.OptionGroup(parser, "Kafka Options", "Adjust settings used for pushing extracted features to a Kafka topic.") group = optparse.OptionGroup(parser, "Kafka Options", "Adjust settings used for pushing extracted features to a Kafka topic.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment