-
Patrick Godwin authoredPatrick Godwin authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gstlal_ll_inspiral_pipe 31.71 KiB
#!/usr/bin/env python
#
# Copyright (C) 2011 Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
### This program will make create a HTCondor DAG to automate the running of
### low-latency, online gstlal_inspiral jobs; see gstlal_ll_trigger_pipe
"""
This program makes a dag for a gstlal inspiral low latency pipeline
"""
__author__ = 'Chad Hanna <channa@caltech.edu>'
#
# import standard modules and append the lalapps prefix to the python path
#
import itertools
import sys, os, stat
import shutil
import socket
from optparse import OptionParser
#
# import the modules we need to build the pipeline
#
from gstlal import inspiral
from gstlal import inspiral_pipe
from gstlal import dagparts
from gstlal import datasource
from lal.utils import CacheEntry
##
# ### Graph of the HTCondor DAG
#
# - gray boxes are optional and depend on the command line given
#
# @dot
# digraph G {
# // graph properties
#
# rankdir=LR;
# compound=true;
# node [shape=record fontsize=10 fontname="Verdana"];
# edge [fontsize=8 fontname="Verdana"];
# gstlal_inspiral [URL="\ref gstlal_inspiral"];
# gstlal_llcbcsummary [URL="\ref gstlal_llcbcsummary"];
# gstlal_llcbcnode [URL="\ref gstlal_llcbcnode"];
# gstlal_inspiral_marginalize_likelihoods_online [URL="\ref gstlal_inspiral_marginalize_likelihoods_online"];
# }
# @enddot
class zookeeper_job(dagparts.DAGJob):
"""
A zookeeper job
"""
def __init__(self, program = "zookeeper-server-start.sh", tag_base = "zookeeper-server-start", datadir = os.path.join(os.getcwd(), "zookeeper"), port = 2181, maxclients = 0, condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(datadir)
except OSError:
pass
f = open("zookeeper.properties", "w")
f.write("""
# the directory where the snapshot is stored.
dataDir=%s
# the port at which the clients will connect
clientPort=%d
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=%d
""" % (datadir, port, maxclients))
f.close()
class kafka_job(dagparts.DAGJob):
"""
A kafka job
"""
def __init__(self, program = "kafka-server-start.sh", tag_base = "kafka-server-start", logdir = os.path.join(os.getcwd(), "kafka"), host = "10.14.0.112:9092", zookeeperaddr = "localhost:2181", condor_commands = {}):
"""
"""
dagparts.DAGJob.__init__(self, program, tag_base = tag_base, universe = "local", condor_commands = condor_commands)
try:
os.mkdir(logdir)
except OSError:
pass
f = open("kafka.properties", "w")
f.write("""
broker.id=0
listeners = PLAINTEXT://%s
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=%s
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.ms=100000
log.roll.ms = 1000000
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=%s
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
""" % (host, logdir, zookeeperaddr))
f.close()
#
# Parse the command line
#
def parse_command_line():
parser = OptionParser(description = __doc__)
# append all the datasource specific options
datasource.append_options(parser)
parser.add_option("--analysis-tag", metavar = "name", help = "Set the name of the analysis, used to distinguish between different DAGs running simultaneously.")
parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s. Note that 50% will be used for zero-padding.")
parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.")
parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process for given mass bins (optional). 0000:0002:IFO1=CHANNEL-NAME1,IFO2=CHANNEL-NAME2 can be given multiple times.")
parser.add_option("--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
parser.add_option("--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
parser.add_option("--inj-framexmit-iface", metavar = "name", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set).")
parser.add_option("--inj-shared-memory-partition", metavar = "name", action = "append", help = "Set the name of the shared memory partition for a given instrument. Can be given multiple times as --inj-shared-memory-partition=IFO=PARTITION-NAME")
parser.add_option("--inj-shared-memory-assumed-duration", type = "int", default = 4, help = "Set the assumed span of files in seconds. Default = 4.")
parser.add_option("--inj-shared-memory-block-size", type = "int", default = 4096, help = "Set the byte size to read per buffer. Default = 4096.")
parser.add_option("--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
parser.add_option("--ht-gate-threshold-linear", metavar = "mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max", type="string", help = "Set the threshold on whitened h(t) to mark samples as gaps (glitch removal) with a linear scale of mchirp")
parser.add_option("--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
parser.add_option("--likelihood-cache", help = "set the cache containin likelihood files")
parser.add_option("--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")
parser.add_option("--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")
parser.add_option("--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
parser.add_option("--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
parser.add_option("--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
parser.add_option("--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
parser.add_option("--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
parser.add_option("--gracedb-group", default = "Test", help = "gracedb group, default Test")
parser.add_option("--gracedb-service-url", default = "https://gracedb.ligo.org/api/", help = "GraceDb service url, default https://gracedb.ligo.org/api/")
parser.add_option("--lvalert-server-url", default = "https://lvalert.cgca.uwm.edu", help = "lvalert server url, default https://lvalert.cgca.uwm.edu")
parser.add_option("--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
parser.add_option("--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
parser.add_option("--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--inj-dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
parser.add_option("--inj-lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times (for injection stream)")
parser.add_option("--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005). The light-travel time between instruments will be added automatically in the coincidence test.")
parser.add_option("--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
parser.add_option("--non-inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
parser.add_option("--local-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
parser.add_option("--inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value for inspiral jobs can be given multiple times")
parser.add_option("--injection-file", metavar = "filename", default = [], action = "append", help = "The injection xml files that corresponds to the low latency injections for given mass bins: only used for making missed found plots. 0000:0002:Injection_1.xml, 0002:0004:Injection_2.xml")
parser.add_option("--state-backup-destination", metavar = "URL", help = "Location to back state up to, e.g. gstlalcbc@ldas-pcdev1.ligo.caltech.edu.")
parser.add_option("--web-dir", help = "set the output path to write the ''offline'' style web page to")
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
parser.add_option("--run-output-kafka", action = "store_true", help = "Actually launch a zookeeper and kafka job as part of the dag")
parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--enable-auth", action = "store_true", default=False, help = "If set, enables authentication for the influx aggregator.")
parser.add_option("--enable-https", action = "store_true", default=False, help = "If set, enables HTTPS connections for the influx aggregator.")
options, filenames = parser.parse_args()
#
# extract data source configuration
#
datasourceinfo = datasource.GWDataSourceInfo(options)
fail = ""
for option in ("bank_cache",):
if getattr(options, option) is None:
fail += "must provide option %s\n" % (option)
if fail: raise ValueError, fail
if options.injection_file:
inj_name_dict = datasource.injection_dict_from_channel_list_with_node_range(options.injection_file)
else:
inj_name_dict = {}
if options.data_source not in datasourceinfo.live_sources :
raise ValueError("datasource option not supported for online analysis. Only framexmit and lvshm are supported.")
#FIXME add consistency check?
bankcache = inspiral_pipe.parse_cache_str(options.bank_cache)
channel_dict = datasourceinfo.channel_dict
state_channel_dict = datasourceinfo.state_channel_dict
dq_channel_dict = datasourceinfo.dq_channel_dict
framexmit_dict = datasourceinfo.framexmit_addr
shm_part_dict = datasourceinfo.shm_part_dict
inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
inj_framexmit_dict = datasource.framexmit_dict_from_framexmit_list(options.inj_framexmit_addr)
## A dictionary for injection shared memory partition
inj_shm_part_dict = {"H1": "LHO_InjData", "L1": "LLO_InjData", "V1": "VIRGO_InjData"}
if options.inj_shared_memory_partition is not None:
inj_shm_part_dict.update( datasource.channel_dict_from_channel_list(options.inj_shared_memory_partition) )
inj_range_dict = {}
for tag, channel in inj_name_dict.items():
inj_range_dict.setdefault(channel, []).append(tag)
for k,v in inj_range_dict.items():
inj_range_dict[k] = sorted(v)
if inj_channel_dict:
for nodes in inj_channel_dict.keys():
if not ( set(inj_channel_dict[nodes].keys()) == set(channel_dict.keys()) ):
raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")
options.state_vector_on_off_dict = datasourceinfo.state_vector_on_off_bits
options.dq_vector_on_off_dict = datasourceinfo.dq_vector_on_off_bits
options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)]
options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)]
return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shm_part_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict, inj_shm_part_dict
#
# MAIN
#
options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shm_part_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict, inj_shm_part_dict = parse_command_line()
try: os.mkdir("logs")
except: pass
try: os.mkdir("gracedb")
except: pass
if options.analysis_tag:
dag = dagparts.DAG("trigger_pipe_%s" % options.analysis_tag)
else:
dag = dagparts.DAG("trigger_pipe")
#
# setup the job classes
#
gstlalInspiralJob = dagparts.DAGJob('gstlal_inspiral', condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
if inj_channel_dict:
gstlalInspiralInjJob = dagparts.DAGJob('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"}))
# A local universe job that will run in a loop marginalizing all of the likelihoods
margJob = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
# an lvalert_listen job
listenJob = dagparts.DAGJob('gstlal_inspiral_lvalert_uberplotter', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server
if options.output_kafka_server is not None and options.run_output_kafka:
zooJob = zookeeper_job("zookeeper-server-start.sh", tag_base = "zookeeper-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port)
kafkaJob = kafka_job("kafka-server-start.sh", tag_base = "kafka-server-start", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port)
zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"})
kafkaNode = dagparts.DAGNode(kafkaJob, dag, [], opts = {"":"kafka.properties"})
# aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
# DQ job
dqJob = dagparts.DAGJob("gstlal_ll_dq", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
if options.state_backup_destination:
# State saving job
stateJob = dagparts.DAGJob("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
#
# Setup the Node classes
#
listenNode = dagparts.DAGNode(listenJob, dag, [], opts = {"gracedb-service-url": options.gracedb_service_url, "lvalert-server-url": options.lvalert_server_url})
# dq with default options
for ifo in channel_dict:
outpath = "aggregator"
try:
os.makedirs(outpath)
except OSError:
pass
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
"framexmit-iface":options.framexmit_iface
}
else :
datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shm_part_dict[ifo]}),
"shared-memory-block-size":options.shared_memory_block_size,
"shared-memory-assumed-duration":options.shared_memory_assumed_duration
}
common_opts = {"psd-fft-length":options.psd_fft_length,
"channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: channel_dict[ifo]}),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: state_channel_dict[ifo]}, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: dq_channel_dict[ifo]}, opt = "dq-channel-name"),
"state-vector-on-bits":options.state_vector_on_bits,
"state-vector-off-bits":options.state_vector_off_bits,
"dq-vector-on-bits":options.dq_vector_on_bits,
"dq-vector-off-bits":options.dq_vector_off_bits,
"data-source":options.data_source,
"out-path": outpath,
"data-backend": options.agg_data_backend,
}
common_opts.update(datasource_opts)
if options.agg_data_backend == 'influx':
common_opts.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
"enable-auth": options.enable_auth,
"enable-https": options.enable_https,
})
if options.enable_auth:
common_opts.update({"enable-auth": ""})
if options.enable_https:
common_opts.update({"enable-https": ""})
dagparts.DAGNode(dqJob, dag, [], opts = common_opts)
#
# loop over banks to run gstlal inspiral pre clustering and far computation
#
jobTags = []
inj_jobTags = []
if options.ht_gate_threshold_linear is not None:
# Linear scale specified
template_mchirp_dict = inspiral_pipe.get_svd_bank_params_online(bank_cache.values()[0])
mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
bank_groups = list(inspiral_pipe.build_bank_groups(bank_cache, [1], options.max_jobs - 1))
if len(options.likelihood_files) != len(bank_groups):
raise ValueError("Likelihood files must correspond 1:1 with bank files")
for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_groups, options.likelihood_files, options.zerolag_likelihood_files)):
svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
jobTags.append("%04d" % num_insp_nodes)
# Calculate the appropriate ht-gate-threshold value
threshold_values = None
if options.ht_gate_threshold_linear is not None:
# Linear scale specified
# use max mchirp in a given svd bank to decide gate threshold
bank_mchirps = [template_mchirp_dict["%04d" % int(os.path.basename(svd_file).split("-")[1].split("_")[3])][1] for svd_file in svd_banks.items()[0][1]]
threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
else:
if options.ht_gate_threshold is not None:
threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
"framexmit-iface":options.framexmit_iface
}
else :
datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(shm_part_dict, opt = "shared-memory-partition"),
"shared-memory-block-size":options.shared_memory_block_size,
"shared-memory-assumed-duration":options.shared_memory_assumed_duration
}
common_opts = {"psd-fft-length":options.psd_fft_length,
"reference-psd":options.reference_psd,
"ht-gate-threshold":threshold_values,
"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits":options.state_vector_on_bits,
"state-vector-off-bits":options.state_vector_off_bits,
"dq-vector-on-bits":options.dq_vector_on_bits,
"dq-vector-off-bits":options.dq_vector_off_bits,
"svd-bank":svd_bank_string,
"tmp-space":dagparts.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":options.data_source,
"gracedb-far-threshold":options.gracedb_far_threshold,
"gracedb-group":options.gracedb_group,
"gracedb-pipeline":options.gracedb_pipeline,
"gracedb-search":options.gracedb_search,
"gracedb-service-url":options.gracedb_service_url,
"job-tag":jobTags[-1],
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"time-slide-file":options.time_slide_file,
"output-kafka-server": options.output_kafka_server
}
common_opts.update(datasource_opts)
inspNode = dagparts.DAGNode(gstlalInspiralJob, dag, [],
opts = common_opts,
input_files = {
"ranking-stat-input":[likefile],
"ranking-stat-pdf":options.marginalized_likelihood_file
},
output_files = {
"output":"/dev/null",
"ranking-stat-output":likefile,
"zerolag-rankingstat-pdf":zerolikefile
}
)
if str("%04d" %num_insp_nodes) in inj_channel_dict:
# FIXME The node number for injection jobs currently follows the same
# numbering system as non-injection jobs, except instead of starting at
# 0000 the numbering starts at 1000. There is probably a better way to
# do this in the future, this system was just the simplest to start
# with
inj_jobTags.append("%04d" % (num_insp_nodes + 1000))
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
"framexmit-iface":options.framexmit_iface
}
else :
datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(inj_shm_part_dict, opt = "shared-memory-partition"),
"shared-memory-block-size":options.inj_shared_memory_block_size,
"shared-memory-assumed-duration":options.inj_shared_memory_assumed_duration
}
common_opts = {"psd-fft-length":options.psd_fft_length,
"reference-psd":options.reference_psd,
"ht-gate-threshold":threshold_values,
"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits":options.inj_state_vector_on_bits,
"state-vector-off-bits":options.inj_state_vector_off_bits,
"dq-vector-on-bits":options.inj_dq_vector_on_bits,
"dq-vector-off-bits":options.inj_dq_vector_off_bits,
"svd-bank":svd_bank_string,
"tmp-space":dagparts.condor_scratch_space(),
"track-psd":"",
"control-peak-time":options.control_peak_time,
"coincidence-threshold":options.coincidence_threshold,
"fir-stride":options.fir_stride,
"data-source":options.data_source,
"gracedb-far-threshold":options.inj_gracedb_far_threshold,
"gracedb-group":options.inj_gracedb_group,
"gracedb-pipeline":options.inj_gracedb_pipeline,
"gracedb-search":options.inj_gracedb_search,
"gracedb-service-url":options.inj_gracedb_service_url,
"job-tag":inj_jobTags[-1],
"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
"min-instruments":options.min_instruments,
"time-slide-file":options.time_slide_file
}
common_opts.update(datasource_opts)
inspInjNode = dagparts.DAGNode(gstlalInspiralInjJob, dag, [],
opts = common_opts,
input_files = {
"ranking-stat-input":[likefile],
"ranking-stat-pdf":options.marginalized_likelihood_file
},
output_files = {
"output":"/dev/null"
}
)
def groups(l, n):
for i in xrange(0, len(l), n):
yield l[i:i+n]
margNode = dagparts.DAGNode(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {})
#
# set up aggregation jobs
#
#
# FIXME by default the inspiral jobs advertise the current directory as their
# job tag, but this should be made to be more flexible
#
# set up common settings for aggregation jobs
agg_options = {
"dump-period": 0,
"base-dir": "aggregator",
"job-tag": os.getcwd(),
"num-jobs": len(jobTags),
"num-threads": 2,
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
}
if options.agg_data_backend == 'influx':
agg_options.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
if options.enable_auth:
agg_options.update({"enable-auth": ""})
if options.enable_https:
agg_options.update({"enable-https": ""})
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"]
usage_routes = ["ram_history"]
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes))
state_routes = []
for ifo in channel_dict.keys():
state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s_strain_dropped" % ifo)
# analysis-based aggregation jobs
# FIXME don't hard code the 1000
max_agg_jobs = 1000
agg_job_bounds = range(0, len(jobTags), max_agg_jobs) + [max_agg_jobs]
for route in agg_routes:
agg_options["route"] = route
if route == "far_history":
agg_options["data-type"] = "min"
else:
agg_options["data-type"] = "max"
for aggstart, aggend in zip(agg_job_bounds[:-1], agg_job_bounds[1:]):
agg_options["job-start"] = aggstart
agg_options["num-jobs"] = aggend - aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
agg_options["route"] = routes
agg_options["data-type"] = "max"
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
#
# summary page
#
#if options.injection_file:
#
# pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
#
# for injfile, jobrange in inj_range_dict.items():
# aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]})
#
#else:
#
# pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
if options.state_backup_destination:
stateNode = dagparts.DAGNode(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {})
#
# Write out the dag and other flies
#
dag.write_sub_files()
# we probably want these jobs to retry indefinitely on dedicated nodes. A user
# can intervene and fix a problem without having to bring the dag down and up.
# There are few enough total jobs that this really shouldn't bog down the
# scheduler. For now 10000 will be considered indefinite
[node.set_retry(10000) for node in dag.get_nodes()]
dag.write_dag()
dag.write_script()
dag.write_cache()