Skip to content
Snippets Groups Projects
Commit 99f97f30 authored by Duncan Meacher's avatar Duncan Meacher
Browse files

ETG pipe: distribute channels evenely across all jobs

parent 8adbb17f
No related branches found
No related tags found
No related merge requests found
......@@ -58,10 +58,32 @@ lsctables.use_in(LIGOLWContentHandler)
def etg_node_gen(gstlalETGJob, dag, parent_nodes, options, channels, data_source_info):
etg_nodes = {}
cumsum_rates = 0
total_rates = 0
outstr = ""
out_index = 0
n_channels = 0
n_cpu = 0
for ii, channel in enumerate(channels,1):
samp_rate = data_source_info.channel_dict[channel]['fsamp']
max_samp_rate = min(2048, int(samp_rate))
min_samp_rate = min(32, max_samp_rate)
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
cumsum_rates += n_rates
total_rates += n_rates
if cumsum_rates >= options.streams or ii == len(data_source_info.channel_dict.keys()):
n_cpu += 1
cumsum_rates = 0
n_streams = math.ceil(total_rates / n_cpu)
if options.verbose:
print "Total streams =", total_rates
print "Total jobs needed =", n_cpu
print "Evenly distributed streams per job =", int(n_streams)
cumsum_rates = 0
for ii, channel in enumerate(channels,1):
n_channels += 1
samp_rate = data_source_info.channel_dict[channel]['fsamp']
max_samp_rate = min(2048, int(samp_rate))
min_samp_rate = min(32, max_samp_rate)
......@@ -69,10 +91,12 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, options, channels, data_source
cumsum_rates += n_rates
outstr = outstr + channel + ":" + str(int(samp_rate))
if cumsum_rates < options.streams:
#if cumsum_rates < options.streams:
if cumsum_rates < n_streams:
outstr = outstr + " --channel-name="
if cumsum_rates >= options.streams or ii == len(data_source_info.channel_dict.keys()):
#if cumsum_rates >= options.streams or ii == len(data_source_info.channel_dict.keys()):
if cumsum_rates >= n_streams or ii == len(data_source_info.channel_dict.keys()):
out_index += 1
outpath = options.out_path + "/gstlal_etg/gstlal_etg_%04d" % out_index
etg_nodes[channel] = \
......@@ -90,8 +114,11 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, options, channels, data_source
input_files = {"frame-cache":options.frame_cache},
output_files = {"out-path":outpath}
)
if options.verbose:
print "Job %04d, number of channels = %3d, number of streams = %4d" %(out_index, n_channels, cumsum_rates)
cumsum_rates = 0
outstr = ""
n_channels = 0
return etg_nodes
......@@ -175,4 +202,4 @@ etg_nodes = etg_node_gen(gstlalETGJob, dag, [], options, channels, data_source_i
dag.write_sub_files()
dag.write_dag()
dag.write_script()
dag.write_cache()
#dag.write_cache()
......@@ -16,7 +16,7 @@ STOP = 1176639000
OUTPATH = $(PWD)
# Number of streams (N_channels x N_rates_per_channel) that each processor will analise
N_STREAMS = 100
N_STREAMS = 500
MISMATCH = 0.2
QHIGH = 40
......@@ -49,7 +49,7 @@ all : dag
# Run etg pipe to produce dag
dag : frame.cache plots channel_list.txt
gstlal_etg_pipe \
./gstlal_etg_pipe \
--data-source frames \
--gps-start-time $(START) \
--gps-end-time $(STOP) \
......@@ -62,6 +62,7 @@ dag : frame.cache plots channel_list.txt
$(CONDOR_COMMANDS) \
--request-cpu 2 \
--request-memory 5GB \
--verbose \
--disable-web-service
# --web-dir $(WEBDIR) \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment