Skip to content
Snippets Groups Projects
Commit 160f32d4 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_trigger_aggregator: port over to changed topic schema, allow for auth + https

parent 7a734894
No related branches found
No related tags found
No related merge requests found
Pipeline #73475 failed
......@@ -51,12 +51,9 @@ def retrieve_triggers(consumer, jobs, route_name = 'coinc', timeout = 1000, max_
### retrieve timeseries for all routes and topics
msg_pack = consumer.poll(timeout_ms = timeout, max_records = max_records)
for tp, messages in msg_pack.items():
job = tp.topic
if job not in jobs:
continue
for message in messages:
try:
triggers.extend(message.value[route_name])
triggers.extend(message.value)
except KeyError: ### no route in message
pass
......@@ -70,6 +67,7 @@ def parse_command_line():
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--job-start", type=int, help="job id to start aggregating from")
parser.add_argument("--route", action="store", default="coinc", help="Specify the route where triggers are stored in.")
parser.add_argument("--dump-period", type = float, default = 1., help = "Wait this many seconds between dumps of the URLs (default = 1., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
......@@ -78,6 +76,8 @@ def parse_command_line():
parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
args = parser.parse_args()
......@@ -108,13 +108,31 @@ if __name__ == '__main__':
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_trigger_aggregator' % jobs[0], auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000)
consumer = KafkaConsumer(
options.route,
bootstrap_servers=[options.kafka_server],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_trigger_aggregator' % jobs[0],
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
else:
consumer = None
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https
)
else: ### hdf5 data backend
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
......@@ -127,13 +145,13 @@ if __name__ == '__main__':
# this is not threadsafe!
logging.info("retrieving data from kafka")
start = timeit.default_timer()
#triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = 'coinc', max_records = 2 * len(jobs))
triggers = retrieve_triggers(consumer, jobs, route_name = 'coinc', max_records = 2 * len(jobs))
#triggers = io.kafka.retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs))
triggers = retrieve_triggers(consumer, jobs, route_name = options.route, max_records = 2 * len(jobs))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
else:
logging.info("retrieving data from bottle routes")
triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = 'coinc', num_threads=options.num_threads)
triggers = io.http.retrieve_triggers(options.base_dir, jobs, options.job_tag, route_name = options.route, num_threads=options.num_threads)
# filter out triggers that don't have a far assigned yet
triggers = [trg for trg in triggers if 'combined_far' in trg]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment