Skip to content
Snippets Groups Projects
Commit d9640f30 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_pipe: modify number of aggregation jobs, add option to...

gstlal_ll_inspiral_pipe: modify number of aggregation jobs, add option to switch between influx and hdf5 agg backends
parent 4ed730d6
No related branches found
No related tags found
No related merge requests found
......@@ -245,6 +245,10 @@ def parse_command_line():
parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file")
parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181)
parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092")
parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
options, filenames = parser.parse_args()
......@@ -332,18 +336,17 @@ margJob = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', univ
listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.gracedb_service_url, gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program, inj_progs = options.inj_lvalert_listener_program, condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), inj_gracedb_service_url = options.inj_gracedb_service_url, inj_gracedb_group = options.inj_gracedb_group, inj_gracedb_search = options.inj_gracedb_search, inj_gracedb_pipeline = options.inj_gracedb_pipeline, injections = True if inj_channel_dict else False)
# Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server
if options.output_kafka_server is not None:
zooJob = zookeeper_job(condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port)
kafkaJob = kafka_job(condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port)
zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"})
kafkaNode = dagparts.DAGNode(kafkaJob, dag, [], opts = {"":"kafka.properties"})
# aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# state job
analysisStateJob = dagparts.DAGJob("gstlal_ll_inspiral_state", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
......@@ -542,11 +545,34 @@ def groups(l, n):
margNode = dagparts.DAGNode(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {})
#
# set up aggregation jobs
#
#
# FIXME by default the inspiral jobs advertise the current directory as their
# job tag, but this should be made to be more flexible
#
# set up common settings for aggregation jobs
agg_options = {
"dump-period": 0,
"base-dir": "aggregator",
"job-tag": os.getcwd(),
"num-jobs": len(jobTags),
"num-threads": 2,
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
}
if options.agg_data_backend == 'influx':
agg_options.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"]
......@@ -555,20 +581,26 @@ agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes))
state_routes = []
for ifo in channel_dict.keys():
state_routes.extend(["%s/dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s/statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s/strain_dropped" % ifo)
state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s_strain_dropped" % ifo)
# analysis-based aggregation jobs
for route in groups(agg_routes, 2):
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
for routes in groups(agg_routes, 1):
all_agg_options = {"route": routes, "data-type": ["min", "max"]}
all_agg_options.update(agg_options)
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server})
all_agg_options = {"route": routes, "data-type": "max"}
all_agg_options.update(agg_options)
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options)
#
# summary page
#
if options.injection_file:
pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment