Skip to content
Snippets Groups Projects
Commit a82176fa authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg_pipe + related Makefile: added concurrency option to limit the...

gstlal_etg_pipe + related Makefile: added concurrency option to limit the amount of concurrent I/O frame reads when submitting condor jobs
parent 638a3c71
No related branches found
No related tags found
No related merge requests found
......@@ -112,14 +112,17 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
etg_nodes = {}
trig_start = options.gps_start_time
# parallelize jobs by segments
for seg in segsdict[ifo]:
# parallelize jobs by channel subsets
for ii, channel_subset in enumerate(data_source_info.channel_subsets):
# parallelize jobs by channel subsets
for job_id, channel_subset in enumerate(data_source_info.channel_subsets, 1):
# parallelize jobs by segments
for seg in segsdict[ifo]:
# format job id correctly
job_id = str(job_id).zfill(4)
# set maximum number of jobs reading concurrently from the same frame file to prevent I/O locks
if ii / options.concurrency == 0:
dep_nodes = parent_nodes
else:
dep_nodes = [etg_nodes[(ii - options.concurrency, seg)]]
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
......@@ -129,8 +132,8 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
outpath = os.path.join(options.out_path, "gstlal_etg")
etg_nodes[seg] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
etg_nodes[(ii, seg)] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = dep_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trig_start),
......@@ -139,7 +142,7 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
"mismatch":options.mismatch,
"qhigh":options.qhigh,
"channel-name":' '.join(channels),
"job-id":job_id,
"job-id":str(ii + 1).zfill(4),
"cadence":options.cadence,
"max-streams":options.max_serial_streams,
"disable-web-service":options.disable_web_service,
......@@ -153,7 +156,7 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
output_files = {"out-path":outpath}
)
if options.verbose:
print "Creating node for seg %s" % repr(seg)
print "Creating node for index, segment %s" % repr((ii, seg))
trig_start = int(seg[1])
......@@ -179,6 +182,7 @@ def parse_command_line():
parser.add_option("-q", "--qhigh", type = "float", default = 100, help = "Q high value for half sine-gaussian waveforms. Default = 100.")
parser.add_option("--max-parallel-streams", type = "int", default = 50, help = "Number of streams (sum(channel_i * num_rates_i)) to process in parallel. This gives the maximum number of channels to process for a given job. Default = 50.")
parser.add_option("--max-serial-streams", type = "int", default = 100, help = "Number of streams (sum(channel_i * num_rates_i)) to process serially within a given job. Default = 100.")
parser.add_option("--concurrency", type = "int", default = 4, help = "Maximum allowed number of parallel jobs reading from the same file, done to prevent I/O locks")
parser.add_option("--segment-length", type = "int", default = 6000, help = "Maximum segment length to process per job. Default = 6000 seconds.")
parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
......@@ -194,6 +198,8 @@ def parse_command_line():
# set max parallel streams to options.max_streams for use in data_source_info for splitting up channel lists to process in parallel
options.max_streams = options.max_parallel_streams
# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check
# sanity check to enforce a minimum segment length
# Minimum segment length chosen so that the overlap is a ~33% hit in run time
min_segment_length = int(4 * idq_multirate_datasource.PSD_DROP_TIME)
......
......@@ -27,6 +27,9 @@ OUTPATH = $(PWD)
MAX_PARALLEL_STREAMS = 50
MAX_SERIAL_STREAMS = 100
# Maximum number of concurrent reads from the same frame file, done to prevent I/O locks
CONCURRENCY = 4
# length of time to process for a given job
SEGMENT_LENGTH = 6000
......@@ -96,6 +99,7 @@ dag : frame.cache plots channel_list.txt segments.xml.gz
--out-path $(OUTPATH) \
--max-serial-streams $(MAX_SERIAL_STREAMS) \
--max-parallel-streams $(MAX_PARALLEL_STREAMS) \
--concurrency $(CONCURRENCY) \
--segment-length $(SEGMENT_LENGTH) \
--mismatch $(MISMATCH) \
--qhigh $(QHIGH) \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment