Skip to content
Snippets Groups Projects
Commit 434d2835 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_etg_pipe + Makefile.gstlal_etg_offline: tweaks in generating offline...

gstlal_etg_pipe + Makefile.gstlal_etg_offline: tweaks in generating offline ETG condor runs and generation to use local frame caching and process channel lists serially, tweaks in cpu and memory requests for condor submission
parent acee0300
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#
# Copyright (C) 2011-2017 Chad Hanna, Duncan Meacher
# Copyright (C) 2011-2018 Chad Hanna, Duncan Meacher, Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
......@@ -20,7 +20,7 @@
This program makes a dag to run gstlal_etg offline
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>'
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
##############################################################################
# import standard modules and append the lalapps prefix to the python path
......@@ -110,99 +110,39 @@ def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 30):
# get a dictionary of all the channels per gstlal_etg job
#
def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channels, data_source_info):
def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_source_info):
etg_nodes = {}
cumsum_rates = 0
total_rates = 0
outstr = ""
n_channels = 0
n_process = 0
trig_start = options.gps_start_time
write_channel_list = True
# Loop over all channels to determine number of threads and minimum number of processes needed
for ii, channel in enumerate(channels,1):
samp_rate = data_source_info.channel_dict[channel]['fsamp']
max_samp_rate = min(2048, int(samp_rate))
min_samp_rate = min(32, max_samp_rate)
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
cumsum_rates += n_rates
total_rates += n_rates
if cumsum_rates >= options.threads or ii == len(data_source_info.channel_dict.keys()):
n_process += 1
cumsum_rates = 0
# Create more even distribution of channels across minimum number of processes
n_threads = math.ceil(total_rates / n_process)
if options.verbose:
print "Total threads =", total_rates
print "Total jobs needed =", n_process
print "Evenly distributed threads per job =", int(n_threads)
for seg in segsdict[ifo]:
cumsum_rates = 0
out_index = 0
channel_list = []
for ii, channel in enumerate(channels,1):
n_channels += 1
samp_rate = data_source_info.channel_dict[channel]['fsamp']
max_samp_rate = min(2048, int(samp_rate))
min_samp_rate = min(32, max_samp_rate)
n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
cumsum_rates += n_rates
outstr = outstr + channel + ":" + str(int(samp_rate))
channel_list.append(channel + " " + str(int(samp_rate)))
outpath = os.path.join(options.out_path, "gstlal_etg")
# Adds channel to current process
if cumsum_rates < n_threads and ii < len(data_source_info.channel_dict.keys()):
outstr = outstr + " --channel-name="
# Finalise each process once number of threads passes threshold
if cumsum_rates >= n_threads or ii == len(data_source_info.channel_dict.keys()):
out_index += 1
etg_nodes[channel] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trig_start),
"trigger-end-time":int(seg[1]),
"data-source":"frames",
"channel-name":outstr,
"mismatch":options.mismatch,
"qhigh":options.qhigh,
"cadence":options.cadence,
"job-id": "%04d" %out_index,
"disable-web-service":options.disable_web_service,
"frame-segments-name": options.frame_segments_name,
"save-hdf": options.save_hdf,
"verbose":options.verbose
},
input_files = {"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"out-path":outpath}
)
if options.verbose and write_channel_list is True :
print "Job %04d, number of channels = %3d, number of threads = %4d" %(out_index, n_channels, cumsum_rates)
if write_channel_list is True :
listpath = options.out_path + "/gstlal_etg/channel_lists/channel_list_%04d.txt" %(out_index)
f = open(listpath,'w')
for channel_out in channel_list:
f.write(channel_out+'\n')
f.close()
cumsum_rates = 0
outstr = ""
n_channels = 0
channel_list = []
outpath = os.path.join(options.out_path, "gstlal_etg")
etg_nodes[seg] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trig_start),
"trigger-end-time":int(seg[1]),
"data-source":"frames",
"mismatch":options.mismatch,
"qhigh":options.qhigh,
"channel-list":options.channel_list,
"cadence":options.cadence,
"disable-web-service":options.disable_web_service,
"local-frame-caching":options.local_frame_caching,
"frame-segments-name": options.frame_segments_name,
"save-hdf": options.save_hdf,
"verbose":options.verbose
},
input_files = {"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file},
output_files = {"out-path":outpath}
)
if options.verbose:
print "Creating node for seg %s" % repr(seg)
trig_start = int(seg[1])
write_channel_list = False
return etg_nodes
......@@ -214,12 +154,12 @@ def parse_command_line():
parser = OptionParser(description = __doc__)
# generic data source options
#datasource.append_options(parser)
multichannel_datasource.append_options(parser)
# trigger generation options
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--disable-web-service", action = "store_true", help = "If set, disables web service that allows monitoring of PSDS of aux channels.")
parser.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data and stores to local filespace.")
parser.add_option("--description", metavar = "string", default = "GSTLAL_IDQ_TRIGGERS", help = "Set the filename description in which to save the output.")
parser.add_option("--cadence", type = "int", default = 32, help = "Rate at which to write trigger files to disk. Default = 32 seconds.")
parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
......@@ -284,7 +224,7 @@ segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg,
# ETG jobs
#
etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, channels, data_source_info)
etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, data_source_info)
#
# all done
......
......@@ -2,7 +2,7 @@ SHELL := /bin/bash
# condor commands
# Set the accounting tag from https://ldas-gridmon.ligo.caltech.edu/ldg_accounting/user
ACCOUNTING_TAG=ligo.dev.o3.detchar.onlinedq.idq
GROUP_USER=duncan.meacher
GROUP_USER=albert.einstein
CONDOR_COMMANDS:=--condor-command=accounting_group=$(ACCOUNTING_TAG) --condor-command=accounting_group_user=$(GROUP_USER)
#########################
......@@ -19,8 +19,8 @@ FSTART=$(shell echo $$((${START}-${SEG_PAD})))
STOP = 1187100000
OUTPATH = $(PWD)
# Number of threads (N_channels x N_rates_per_channel) that each cpu will process
N_THREADS = 50
# Target number of streams (N_channels x N_rates_per_channel) that each cpu will process
N_STREAMS = 50
MISMATCH = 0.05
QHIGH = 100
......@@ -82,13 +82,14 @@ dag : frame.cache plots channel_list.txt segments.xml.gz
--frame-segments-file segments.xml.gz \
--frame-segments-name datasegments \
--channel-list channel_list.txt \
--local-frame-caching \
--out-path $(OUTPATH) \
--threads $(N_THREADS)\
--max-streams $(N_STREAMS)\
--mismatch $(MISMATCH) \
--qhigh $(QHIGH) \
$(CONDOR_COMMANDS) \
--request-cpu 4 \
--request-memory 3GB \
--request-cpu 2 \
--request-memory 8GB \
--verbose \
--disable-web-service
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment