There will a brief, around 15 minute, outage of git.ligo.org, chat.ligo.org, and docs.ligo.org tomorrow evening, Tuesday 15 October 2019, at 10pm CDT. This is due to some campus level network maintenance work that is being carried out.

Commit 62d22f32 authored by Patrick Godwin's avatar Patrick Godwin

gstlal_ll_inspiral_aggregator: remove unneccesary Pool, change how kafka...

gstlal_ll_inspiral_aggregator: remove unneccesary Pool, change how kafka topics/partitions are arranged, add --across-jobs option to allow leaders
parent b2abff37
Pipeline #70749 passed with stages
in 23 minutes and 1 second
......@@ -21,7 +21,6 @@
import argparse
import json
import logging
from multiprocessing import Pool
import sys, os
import time
import timeit
......@@ -62,6 +61,7 @@ def parse_command_line():
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
args = parser.parse_args()
......@@ -91,18 +91,35 @@ if __name__ == '__main__':
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000)
consumer = KafkaConsumer(
*routes,
bootstrap_servers=[options.kafka_server],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]),
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
else:
consumer = None
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name, auth=options.enable_auth, https=options.enable_https)
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=options.across_jobs
)
else: ### hdf5 data backend
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
......@@ -119,7 +136,7 @@ if __name__ == '__main__':
# this is not threadsafe!
logging.info("retrieving data from kafka")
start = timeit.default_timer()
datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, max_records = 2 * len(jobs) * len(routes))
datadata = io.kafka.retrieve_timeseries(consumer, routes, max_records = 2 * len(jobs) * len(routes))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment