#!/usr/bin/env python # # Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """ This program makes a dag to run offline gstlal_feature_extractor batch jobs """ __author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>' # ============================= # # preamble # # ============================= import os import optparse import lal from ligo import segments from gstlal import aggregator from gstlal import inspiral_pipe from gstlal import dagparts as gstlaldagparts from gstlal.fxtools import feature_extractor from gstlal.fxtools import multichannel_datasource from gstlal.fxtools import multirate_datasource from gstlal.fxtools import utils # ============================= # # functions # # ============================= def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30): """ get a dictionary of all the analysis segments """ segsdict = segments.segmentlistdict() # start pad to allow whitener to settle + the maximum template_length start_pad = multirate_datasource.PSD_DROP_TIME + max_template_length segsdict[ifo] = segments.segmentlist([boundary_seg]) segsdict[ifo] = segsdict[ifo].protract(start_pad) segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad) if not segsdict[ifo]: del segsdict[ifo] return segsdict def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30): """ get a dictionary of all the channels per gstlal_feature_extractor job """ feature_extractor_nodes = {} # parallelize jobs by channel subsets for ii, channel_subset in enumerate(data_source_info.channel_subsets): print "Creating feature extractor jobs for channel subset %d" % ii # parallelize jobs by segments for seg in segsdict[ifo]: # only produce jobs where the analysis runtime after applying segments is nonzero if not data_source_info.frame_segments[ifo].intersects_segment(seg): if options.verbose: print " Skipping segment (%d, %d) for channel subset %d since there is no analyzable data here" % (int(seg[0]), int(seg[1]), ii) continue # set maximum number of jobs reading concurrently from the same frame file to prevent I/O locks if ii / options.concurrency == 0: dep_nodes = parent_nodes else: dep_nodes = [feature_extractor_nodes[(ii - options.concurrency, seg)]] # creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset] # FIXME: hacky way of getting options to get passed correctly for channels channels[0] = channels[0].split('=')[1] outpath = os.path.join(options.out_path, "gstlal_feature_extractor") # define analysis times gps_start_time = int(seg[0]) feature_start_time = gps_start_time + multirate_datasource.PSD_DROP_TIME + max_template_length feature_end_time = min(int(seg[1]), options.gps_end_time) feature_extractor_nodes[(ii, seg)] = \ inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = dep_nodes, opts = {"gps-start-time":gps_start_time, "gps-end-time":feature_end_time, "feature-start-time":feature_start_time, "feature-end-time":feature_end_time, "data-source":"frames", "mismatch":options.mismatch, "waveform":options.waveform, "qhigh":options.qhigh, "channel-name":' '.join(channels), "job-id":str(ii + 1).zfill(4), "cadence":options.cadence, "persist-cadence":options.persist_cadence, "max-streams":options.max_serial_streams, "disable-web-service":options.disable_web_service, "local-frame-caching":options.local_frame_caching, "frame-segments-name": options.frame_segments_name, "save-format": options.save_format, "verbose":options.verbose }, input_files = {"frame-cache":options.frame_cache, "frame-segments-file":options.frame_segments_file}, output_files = {"out-path":outpath} ) if options.verbose: print " Creating node for channel subset %d, gps range %d - %d" % (ii, feature_start_time, feature_end_time) return feature_extractor_nodes # ============================= # # command line parser # # ============================= def parse_command_line(): parser = optparse.OptionParser(usage = '%prog [options]', description = __doc__) # generic data source options multichannel_datasource.append_options(parser) feature_extractor.append_options(parser) # DAG architecture options parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.") parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.") parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks") parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.") # Condor commands parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2") parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory, default = 8GB") parser.add_option("--request-disk", default = "50GB", metavar = "integer", help = "set the requested node local scratch space size needed, default = 50GB") parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times") options, filenames = parser.parse_args() # set max parallel streams to options.max_streams for use in data_source_info for splitting up channel lists to process in parallel options.max_streams = options.max_parallel_streams # FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check # sanity check to enforce a minimum segment length # Minimum segment length chosen so that the overlap is a ~33% hit in run time min_segment_length = int(4 * multirate_datasource.PSD_DROP_TIME) assert options.segment_length >= min_segment_length return options, filenames # ============================= # # main # # ============================= # # parsing and setting up core structures # options, filenames = parse_command_line() data_source_info = multichannel_datasource.DataSourceInfo(options) ifo = data_source_info.instrument channels = data_source_info.channel_dict.keys() # FIXME Work out better way to determine max template length max_template_length = 30 # # create directories if needed # listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists") aggregator.makedir(listdir) aggregator.makedir("logs") # # set up dag and job classes # dag = inspiral_pipe.DAG("feature_extractor_pipe") condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"} condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options) feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands) segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length) # # set up jobs # feature_extractor_nodes = feature_extractor_node_gen(feature_extractor_job, dag, [], segsdict, ifo, options, data_source_info, max_template_length=max_template_length) # # write out dag and sub files # dag.write_sub_files() dag.write_dag() dag.write_script()