Forked from
lscsoft / GstLAL
3089 commits behind the upstream repository.
-
Patrick Godwin authored
gstlal_feature_extractor_pipe: fix issue where last set of jobs extended past gps end times, added more readable output
Patrick Godwin authoredgstlal_feature_extractor_pipe: fix issue where last set of jobs extended past gps end times, added more readable output
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_feature_extractor_pipe 8.68 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run offline gstlal_feature_extractor batch jobs
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
# =============================
#
# preamble
#
# =============================
import os
import optparse
import lal
from ligo import segments
from gstlal import aggregator
from gstlal import inspiral_pipe
from gstlal import dagparts as gstlaldagparts
from gstlal.fxtools import feature_extractor
from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils
# =============================
#
# functions
#
# =============================
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
"""
get a dictionary of all the analysis segments
"""
segsdict = segments.segmentlistdict()
# start pad to allow whitener to settle + the maximum template_length
start_pad = multirate_datasource.PSD_DROP_TIME + max_template_length
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
if not segsdict[ifo]:
del segsdict[ifo]
return segsdict
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30):
"""
get a dictionary of all the channels per gstlal_feature_extractor job
"""
feature_extractor_nodes = {}
# parallelize jobs by channel subsets
for ii, channel_subset in enumerate(data_source_info.channel_subsets):
print "Creating feature extractor jobs for channel subset %d" % ii
# parallelize jobs by segments
for seg in segsdict[ifo]:
# only produce jobs where the analysis runtime after applying segments is nonzero
if not data_source_info.frame_segments[ifo].intersects_segment(seg):
if options.verbose:
print " Skipping segment (%d, %d) for channel subset %d since there is no analyzable data here" % (int(seg[0]), int(seg[1]), ii)
continue
# set maximum number of jobs reading concurrently from the same frame file to prevent I/O locks
if ii / options.concurrency == 0:
dep_nodes = parent_nodes
else:
dep_nodes = [feature_extractor_nodes[(ii - options.concurrency, seg)]]
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
# FIXME: hacky way of getting options to get passed correctly for channels
channels[0] = channels[0].split('=')[1]
outpath = os.path.join(options.out_path, "gstlal_feature_extractor")
# define analysis times
gps_start_time = int(seg[0])
feature_start_time = gps_start_time + multirate_datasource.PSD_DROP_TIME + max_template_length
feature_end_time = min(int(seg[1]), options.gps_end_time)
feature_extractor_nodes[(ii, seg)] = \
inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = dep_nodes,
opts = {"gps-start-time":gps_start_time,
"gps-end-time":feature_end_time,
"feature-start-time":feature_start_time,
"feature-end-time":feature_end_time,
"data-source":"frames",
"mismatch":options.mismatch,
"waveform":options.waveform,
"qhigh":options.qhigh,
"channel-name":' '.join(channels),
"job-id":str(ii + 1).zfill(4),
"cadence":options.cadence,
"persist-cadence":options.persist_cadence,
"max-streams":options.max_serial_streams,
"disable-web-service":options.disable_web_service,
"local-frame-caching":options.local_frame_caching,
"frame-segments-name": options.frame_segments_name,
"save-format": options.save_format,
"verbose":options.verbose
},
input_files = {"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"out-path":outpath}
)
if options.verbose:
print " Creating node for channel subset %d, gps range %d - %d" % (ii, feature_start_time, feature_end_time)
return feature_extractor_nodes
# =============================
#
# command line parser
#
# =============================
def parse_command_line():
parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__)
# generic data source options
multichannel_datasource.append_options(parser)
feature_extractor.append_options(parser)
# DAG architecture options
parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.")
parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.")
parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks")
parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.")
# Condor commands
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2")
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory, default = 8GB")
parser.add_option("--request-disk", default = "50GB", metavar = "integer", help = "set the requested node local scratch space size needed, default = 50GB")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
options, filenames = parser.parse_args()
# set max parallel streams to options.max_streams for use in data_source_info for splitting up channel lists to process in parallel
options.max_streams = options.max_parallel_streams
# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check
# sanity check to enforce a minimum segment length
# Minimum segment length chosen so that the overlap is a ~33% hit in run time
min_segment_length = int(4 * multirate_datasource.PSD_DROP_TIME)
assert options.segment_length >= min_segment_length
return options, filenames
# =============================
#
# main
#
# =============================
#
# parsing and setting up core structures
#
options, filenames = parse_command_line()
data_source_info = multichannel_datasource.DataSourceInfo(options)
ifo = data_source_info.instrument
channels = data_source_info.channel_dict.keys()
# FIXME Work out better way to determine max template length
max_template_length = 30
#
# create directories if needed
#
listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")
#
# set up dag and job classes
#
dag = inspiral_pipe.DAG("feature_extractor_pipe")
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length)
#
# set up jobs
#
feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], segsdict, ifo, options, data_source_info, max_template_length=max_template_length)
#
# write out dag and sub files
#
dag.write_sub_files()
dag.write_dag()
dag.write_script()