Maintenance will be performed on,,, and starting at 10am CDT on 2 June 2020 and lasting for around 20 minutes. There will be a short period of downtime, around 5 minutes, towards the end of the maintenance window. Please direct any comments, questions, or concerns to

Commit 8a7c9da5 authored by Patrick Godwin's avatar Patrick Godwin

add gstlal_ll_inspiral_trigger_aggregator to gstlal-ugly, add job to gstlal_ll_inspiral_pipe

parent f9d64df6
......@@ -304,6 +304,7 @@ if options.output_kafka_server is not None and options.run_output_kafka:
# aggregator job
aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
trigaggJob = dagparts.DAGJob("gstlal_ll_inspiral_trigger_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command))
# Summary page job
#pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command))
......@@ -578,6 +579,25 @@ for route in agg_routes:
agg_options["num-jobs"] = aggend - aggstart
aggNode = dagparts.DAGNode(aggJob, dag, [], opts = agg_options)
# Trigger aggregation
trigagg_options = {
"dump-period": 0,
"base-dir": "aggregator",
"job-tag": os.getcwd(),
"num-jobs": len(jobTags),
"num-threads": 2,
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
if options.agg_data_backend == 'influx':
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
aggNode = dagparts.DAGNode(trigaggJob, dag, [], opts = trigagg_options)
# state-based aggregation jobs
for routes in groups(state_routes, 2):
agg_options["route"] = routes
......@@ -29,6 +29,7 @@ dist_bin_SCRIPTS = \
gstlal_cache_to_segments \
gstlal_ligolw_add_without_reassign \
gstlal_ll_inspiral_aggregator \
gstlal_ll_inspiral_trigger_aggregator \
gstlal_ll_dq \
gstlal_condor_top \
gstlal_injsplitter \
#!/usr/bin/env python
# Copyright (C) 2016 Kipp Cannon, Chad Hanna
# Copyright (C) 2019 Patrick Godwin
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import argparse
import json
import logging
from multiprocessing import Pool
import sys, os
import time
import timeit
import numpy
from ligo.scald import aggregator
from ligo.scald import io
# =============================================================================
# Command Line
# =============================================================================
def retrieve_triggers(consumer, jobs, route_name = 'coinc', timeout = 1000, max_records = 1000):
A function to pull triggers from kafka for a set of jobs (topics) and
route_name (key in the incoming json messages)
triggers = []
### retrieve timeseries for all routes and topics
msg_pack = consumer.poll(timeout_ms = timeout, max_records = max_records)
for tp, messages in msg_pack.items():
job = tp.topic
if job not in jobs:
for message in messages:
except KeyError: ### no route in message
return triggers
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--job-start", type=int, help="job id to start aggregating from")
parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently, default 16.")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example:")
parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
args = parser.parse_args()
assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]'
return args
# =============================================================================
# Main
# =============================================================================
if __name__ == '__main__':
options = parse_command_line()
# FIXME don't hardcode some of these?
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_trigger_aggregator' % jobs[0], auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000)
consumer = None
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
else: ### hdf5 data backend
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
# start an infinite loop to keep updating and aggregating data
while True:"sleeping for %.1f s" % options.dump_period)
if consumer:
# this is not threadsafe!"retrieving data from kafka")
start = timeit.default_timer()
#triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = 'coinc', max_records = 2 * len(jobs))
triggers = retrieve_triggers(consumer, jobs, route_name = 'coinc', max_records = 2 * len(jobs))
elapsed = timeit.default_timer() - start"time to retrieve data: %.1f s" % elapsed)
else:"retrieving data from bottle routes")
triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = 'coinc', num_threads=options.num_threads)
# filter out triggers that don't have a far assigned yet
triggers = [trg for trg in triggers if 'combined_far' in trg]
# store and reduce data for each job
if triggers:
start = timeit.default_timer()"storing and reducing triggers")
agg_sink.store_triggers('triggers', triggers, far_key = 'combined_far', time_key = 'end')
elapsed = timeit.default_timer() - start"time to store/reduce triggers: %.1f s" % elapsed)
else:"no triggers to process")
# close connection to consumer if using kafka
if consumer:
# always end on an error so that condor won't think we're done and will
# restart us
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment