diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index 288f3f25ef233a65523a91c96a5246bb1475a119..96a6664081269a62fa8833715f8040de51dee22a 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -245,6 +245,10 @@ def parse_command_line(): parser.add_option("--time-slide-file", metavar = "filename", help = "Set the time slide table xml file") parser.add_option("--zookeeper-port", type = "int", metavar = "number", help = "Set the zookeeper port. default 2181", default = 2181) parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the kafka server hostname to send output data to - note, for now this must be the cluster facing ip address of the submit node. example = 10.14.0.112:9092") + parser.add_option("--agg-data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.") + parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") + parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") + parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") options, filenames = parser.parse_args() @@ -332,18 +336,17 @@ margJob = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', univ listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.gracedb_service_url, gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program, inj_progs = options.inj_lvalert_listener_program, condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), inj_gracedb_service_url = options.inj_gracedb_service_url, inj_gracedb_group = options.inj_gracedb_group, inj_gracedb_search = options.inj_gracedb_search, inj_gracedb_pipeline = options.inj_gracedb_pipeline, injections = True if inj_channel_dict else False) # Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server + if options.output_kafka_server is not None: zooJob = zookeeper_job(condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port) kafkaJob = kafka_job(condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port) zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"}) kafkaNode = dagparts.DAGNode(kafkaJob, dag, [], opts = {"":"kafka.properties"}) + # aggregator job aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) -# state job -analysisStateJob = dagparts.DAGJob("gstlal_ll_inspiral_state", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) - # Summary page job pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)) @@ -542,11 +545,34 @@ def groups(l, n): margNode = dagparts.DAGNode(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {}) +# +# set up aggregation jobs +# + # # FIXME by default the inspiral jobs advertise the current directory as their # job tag, but this should be made to be more flexible # +# set up common settings for aggregation jobs +agg_options = { + "dump-period": 0, + "base-dir": "aggregator", + "job-tag": os.getcwd(), + "num-jobs": len(jobTags), + "num-threads": 2, + "job-start": 0, + "kafka-server": options.output_kafka_server, + "data-backend": options.agg_data_backend, +} + +if options.agg_data_backend == 'influx': + agg_options.update({ + "influx-database-name": options.influx_database_name, + "influx-hostname": options.influx_hostname, + "influx-port": options.influx_port, + }) + # define routes used for aggregation jobs snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict] network_routes = ["likelihood_history", "snr_history", "latency_history", "far_history"] @@ -555,20 +581,26 @@ agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes)) state_routes = [] for ifo in channel_dict.keys(): - state_routes.extend(["%s/dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]]) - state_routes.extend(["%s/statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]]) - state_routes.append("%s/strain_dropped" % ifo) + state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]]) + state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]]) + state_routes.append("%s_strain_dropped" % ifo) # analysis-based aggregation jobs -for route in groups(agg_routes, 2): - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) - aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["min"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) +for routes in groups(agg_routes, 1): + all_agg_options = {"route": routes, "data-type": ["min", "max"]} + all_agg_options.update(agg_options) + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options) # state-based aggregation jobs for routes in groups(state_routes, 2): - analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "base-dir": "aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 4, "data-type": ["max"], "job-start": 0, "route": route, "kafka-server": options.output_kafka_server}) + all_agg_options = {"route": routes, "data-type": "max"} + all_agg_options.update(agg_options) + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = all_agg_options) +# # summary page +# + if options.injection_file: pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})