Skip to content
Snippets Groups Projects
Commit 30a05cf1 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg + related files: added ability to process channels in parallel (via...

gstlal_etg + related files: added ability to process channels in parallel (via condor) as well as serially, added tracking of jobs by job_id (parallel) and subset_id (serial) to reflect this. Added a few options in Makefile, gstlal_etg_pipe to tune level of serial/parallel processing
parent 26d99771
No related branches found
No related tags found
No related merge requests found
......@@ -119,6 +119,7 @@ class MultiChannelHandler(simplehandler.Handler):
# format id for aesthetics
self.job_id = str(kwargs.pop("job_id")).zfill(4)
self.subset_id = str(kwargs.pop("subset_id")).zfill(4)
### iDQ saving properties
self.last_save_time = None
......@@ -136,11 +137,10 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata = idq_utils.HDF5FeatureData(columns, keys = self.keys, cadence = self.cadence)
duration = int(options.gps_end_time) - int(options.gps_start_time)
#self.fname = '%s-%d-%d' % (self.tag, options.gps_start_time, duration)
self.fname = os.path.splitext(idq_utils.to_trigger_filename(self.basename, options.gps_start_time, duration, 'h5'))[0]
self.fpath = idq_utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, options.gps_start_time, self.job_id)
self.tmp_path = idq_utils.to_trigger_path(tmp_dir, self.basename, options.gps_start_time, self.job_id)
self.fpath = idq_utils.to_trigger_path(os.path.abspath(self.out_path), self.basename, options.gps_start_time, self.job_id, self.subset_id)
self.tmp_path = idq_utils.to_trigger_path(tmp_dir, self.basename, options.gps_start_time, self.job_id, self.subset_id)
# create temp and output directories if they don't exist
aggregator.makedir(self.fpath)
......@@ -489,7 +489,7 @@ def parse_command_line():
parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.")
parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.")
parser.add_option("--kafka-server", metavar = "string", help = "If using Kafka, sets the server url that the kafka topic is hosted on.")
parser.add_option("--job-id", type = "string", default = "0001", help = "Sets the job identication of the ETG with a 4 digit hex code, useful for running multiple instances. Default = 0001")
parser.add_option("--job-id", type = "string", default = "0001", help = "Sets the job identication of the ETG with a 4 digit integer code padded with zeros, useful for running multiple instances of the same pipeline in parallel. Default = 0001")
parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
parser.add_option("-q", "--qhigh", type = "float", default = 20, help = "Q high value for half sine-gaussian waveforms. Default = 20.")
parser.add_option("--trigger-start-time", type = "int", metavar = "seconds", help = "Set the start time of the segment to output triggers in GPS seconds. Required unless --data-source=lvshm")
......@@ -612,7 +612,7 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# get path where triggers are located
duration = int(options.gps_end_time) - int(options.gps_start_time)
fname = idq_utils.to_trigger_filename(basename, options.gps_start_time, duration, 'h5')
fpath = idq_utils.to_trigger_path(os.path.abspath(options.out_path), basename, options.gps_start_time, str(subset_id).zfill(4))
fpath = idq_utils.to_trigger_path(os.path.abspath(options.out_path), basename, options.gps_start_time, options.job_id, str(subset_id).zfill(4))
trg_file = os.path.join(fpath, fname)
# visit groups within a given hdf5 file
......@@ -748,7 +748,7 @@ for subset_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# define structures to synchronize output streams and extract triggers from buffer
logger.info("setting up pipeline handler...")
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, basename = basename, out_path = options.out_path, instrument = instrument, keys = src.keys(), frame_segments = data_source_info.frame_segments, job_id = subset_id)
handler = MultiChannelHandler(mainloop, pipeline, basis_params = basis_params, basename = basename, out_path = options.out_path, instrument = instrument, keys = src.keys(), frame_segments = data_source_info.frame_segments, job_id = options.job_id, subset_id = subset_id)
logger.info("attaching appsinks to pipeline...")
appsync = LinkedAppSync(appsink_new_buffer = handler.bufhandler)
......
......@@ -53,7 +53,6 @@ class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
#
# get a dictionary of all the segments
#
......@@ -90,17 +89,16 @@ def breakupsegs(seglist, maxextent, overlap):
newseglist.extend(breakupseg(bigseg, maxextent, overlap))
return newseglist
def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 30):
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length
start_pad = idq_multirate_datasource.PSD_DROP_TIME + max_template_length
# Chosen so that the overlap is only a ~5% hit in run time for long segments...
segment_length = int(10 * start_pad)
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
# FIXME revert to gstlaldagparts.breakupsegs and remove above two functions when we no longer write to ascii.
segsdict[ifo] = breakupsegs(segsdict[ifo], segment_length, start_pad)
#segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
if not segsdict[ifo]:
del segsdict[ifo]
......@@ -114,35 +112,50 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
etg_nodes = {}
trig_start = options.gps_start_time
# parallelize jobs by segments
for seg in segsdict[ifo]:
outpath = os.path.join(options.out_path, "gstlal_etg")
etg_nodes[seg] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trig_start),
"trigger-end-time":int(seg[1]),
"data-source":"frames",
"mismatch":options.mismatch,
"qhigh":options.qhigh,
"channel-list":options.channel_list,
"cadence":options.cadence,
"disable-web-service":options.disable_web_service,
"local-frame-caching":options.local_frame_caching,
"frame-segments-name": options.frame_segments_name,
"save-hdf": options.save_hdf,
"verbose":options.verbose
},
input_files = {"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"out-path":outpath}
)
if options.verbose:
print "Creating node for seg %s" % repr(seg)
trig_start = int(seg[1])
# parallelize jobs by channel subsets
for job_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# format job id correctly
job_id = str(job_id).zfill(4)
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
# FIXME: hacky way of getting options to get passed correctly for channels
channels[0] = channels[0].split('=')[1]
outpath = os.path.join(options.out_path, "gstlal_etg")
etg_nodes[seg] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trig_start),
"trigger-end-time":int(seg[1]),
"data-source":"frames",
"mismatch":options.mismatch,
"qhigh":options.qhigh,
"channel-name":' '.join(channels),
"job-id":job_id,
"cadence":options.cadence,
"max-streams":options.max_serial_streams,
"disable-web-service":options.disable_web_service,
"local-frame-caching":options.local_frame_caching,
"frame-segments-name": options.frame_segments_name,
"save-hdf": options.save_hdf,
"verbose":options.verbose
},
input_files = {"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"out-path":outpath}
)
if options.verbose:
print "Creating node for seg %s" % repr(seg)
trig_start = int(seg[1])
return etg_nodes
......@@ -162,20 +175,31 @@ def parse_command_line():
parser.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data and stores to local filespace.")
parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
parser.add_option("-q", "--qhigh", type = "float", default = 20, help = "Q high value for half sine-gaussian waveforms. Default = 20.")
parser.add_option("-t", "--threads", type = "float", default = 50, help = "Number of threads to process per node. Default = 50.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.05, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.05.")
parser.add_option("-q", "--qhigh", type = "float", default = 100, help = "Q high value for half sine-gaussian waveforms. Default = 100.")
parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.")
parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.")
parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.")
parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
parser.add_option("--out-path", metavar = "path", default = ".", help = "Write to this path. Default = .")
# Condor commands
parser.add_option("--request-cpu", default = "4", metavar = "integer", help = "set the requested node CPU count, default = 4")
parser.add_option("--request-memory", default = "3GB", metavar = "integer", help = "set the requested node memory, default = 3GB")
parser.add_option("--request-cpu", default = "2", metavar = "integer", help = "set the requested node CPU count, default = 2")
parser.add_option("--request-memory", default = "8GB", metavar = "integer", help = "set the requested node memory, default = 8GB")
parser.add_option("--condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value; can be given multiple times")
options, filenames = parser.parse_args()
# set max parallel streams to options.max_streams for use in data_source_info for splitting up channel lists to process in parallel
options.max_streams = options.max_parallel_streams
# sanity check to enforce a minimum segment length
# Minimum segment length chosen so that the overlap is a ~33% hit in run time
min_segment_length = int(4 * idq_multirate_datasource.PSD_DROP_TIME)
assert options.segment_length >= min_segment_length
return options, filenames
#
......@@ -218,7 +242,7 @@ dag = inspiral_pipe.DAG("etg_trigger_pipe")
gstlalETGJob = inspiral_pipe.generic_job("gstlal_etg", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, max_template_length)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, options.segment_length, max_template_length=max_template_length)
#
# ETG jobs
......
......@@ -101,14 +101,14 @@ def floor_div(x, n):
assert n > 0
return (x / n) * n
def to_trigger_path(rootdir, basename, start_time, job_id):
def to_trigger_path(rootdir, basename, start_time, job_id, subset_id):
"""!
Given a basepath, instrument, description, start_time, job_id, will return a
path pointing to a directory structure in the form:
${rootdir}/${basename}/${basename}-${start_time_mod1e5}/${basename}-${job_id}/
${rootdir}/${basename}/${basename}-${start_time_mod1e5}/${basename}-${job_id}-${subset_id}/
"""
start_time_mod1e5 = str(start_time)[:5]
return os.path.join(rootdir, basename, '-'.join([basename, start_time_mod1e5]), '-'.join([basename, job_id]))
return os.path.join(rootdir, basename, '-'.join([basename, start_time_mod1e5]), '-'.join([basename, job_id, subset_id]))
def to_trigger_filename(basename, start_time, duration, suffix, tmp=False):
"""!
......
......@@ -19,8 +19,18 @@ FSTART=$(shell echo $$((${START}-${SEG_PAD})))
STOP = 1187100000
OUTPATH = $(PWD)
# Target number of streams (N_channels x N_rates_per_channel) that each cpu will process
N_STREAMS = 50
# NOTE: * if max_serial_streams > max_parallel_streams, all jobs will be parallelized by channel
# * if max_parallel_streams > num_channels in channel list, all jobs will be processed serially, with processing driven by max_serial_streams
# * any other combination will produce a mix of parallelization by channels and processing channels serially per job
MAX_PARALLEL_STREAMS = 50
MAX_SERIAL_STREAMS = 100
# length of time to process for a given job
SEGMENT_LENGTH = 6000
# Parameter space config of half-sine-gaussians
MISMATCH = 0.05
QHIGH = 100
......@@ -81,10 +91,12 @@ dag : frame.cache plots channel_list.txt segments.xml.gz
--frame-cache frame.cache \
--frame-segments-file segments.xml.gz \
--frame-segments-name datasegments \
--channel-list channel_list.txt \
--local-frame-caching \
--channel-list channel_list.txt \
--out-path $(OUTPATH) \
--max-streams $(N_STREAMS)\
--max-serial-streams $(MAX_SERIAL_STREAMS) \
--max-parallel-streams $(MAX_PARALLEL_STREAMS) \
--segment-length $(SEGMENT_LENGTH) \
--mismatch $(MISMATCH) \
--qhigh $(QHIGH) \
$(CONDOR_COMMANDS) \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment