Commit 90a2e91d authored by Patrick Godwin's avatar Patrick Godwin

gstlal_ll_inspiral_aggregator: remove unneccesary Pool, change how kafka...

gstlal_ll_inspiral_aggregator: remove unneccesary Pool, change how kafka topics/partitions are arranged, add --across-jobs option to allow leaders
parent 23cf0cf5
......@@ -21,7 +21,6 @@
import argparse
import json
import logging
from multiprocessing import Pool
import sys, os
import time
import timeit
......@@ -62,6 +61,7 @@ def parse_command_line():
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--enable-auth", action = "store_true", help = "If set, enables authentication for the influx aggregator.")
parser.add_argument("--enable-https", action = "store_true", help = "If set, enables HTTPS connections for the influx aggregator.")
parser.add_argument("--across-jobs", action = "store_true", help = "If set, aggregate data across jobs as well.")
args = parser.parse_args()
......@@ -91,18 +91,35 @@ if __name__ == '__main__':
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]), auto_offset_reset='latest', max_poll_interval_ms = 60000, session_timeout_ms=30000, heartbeat_interval_ms=10000, reconnect_backoff_ms=5000, reconnect_backoff_max_ms=30000)
consumer = KafkaConsumer(
*routes,
bootstrap_servers=[options.kafka_server],
key_deserializer=lambda m: json.loads(m.decode('utf-8')),
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='%s_%s_aggregator' % (routes[0], options.data_type[0]),
auto_offset_reset='latest',
max_poll_interval_ms = 60000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
reconnect_backoff_ms=5000,
reconnect_backoff_max_ms=30000
)
else:
consumer = None
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name, auth=options.enable_auth, https=options.enable_https)
agg_sink = io.influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
auth=options.enable_auth,
https=options.enable_https,
reduce_across_tags=options.across_jobs
)
else: ### hdf5 data backend
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
......@@ -119,7 +136,7 @@ if __name__ == '__main__':
# this is not threadsafe!
logging.info("retrieving data from kafka")
start = timeit.default_timer()
datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, max_records = 2 * len(jobs) * len(routes))
datadata = io.kafka.retrieve_timeseries(consumer, routes, max_records = 2 * len(jobs) * len(routes))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment