Skip to content
Snippets Groups Projects
Commit b781a226 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_aggregator: use new aggregation classes from datamon to...

gstlal_ll_inspiral_aggregator: use new aggregation classes from datamon to aggregate data, tidy up imports
parent 990c33db
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#
# Copyright (C) 2016 Kipp Cannon, Chad Hanna
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
......@@ -17,29 +18,19 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import h5py
import numpy
import sys, os
import itertools
import argparse
import lal
from lal import LIGOTimeGPS
import time
from gi.repository import GLib
import json
import logging
import subprocess
import urllib2
import shutil
import collections
from multiprocessing import Pool
import json
import sys, os
import time
import timeit
import numpy
from datamon import aggregator
from datamon import io
MIN_TIME_QUANTA = 10000
DIRS = 6
#
# =============================================================================
......@@ -93,59 +84,49 @@ if __name__ == '__main__':
options = parse_command_line()
# FIXME don't hardcode some of these?
datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type]
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
routes = options.route
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
prevdataspan = set()
# We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs
if options.kafka_server is not None:
# We instantiate multiple consumers (based on --num-threads) to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('utf-8')), auto_offset_reset='latest', group_id='%s_aggregator' % routes[0], max_poll_interval_ms = 60000)
else:
consumer = None
# Instantiate influxDB connection if data backend is influx
# set up aggregator sink
if options.data_backend == 'influx':
influx_client = io.influx.create_client(options.influx_hostname, options.influx_port)
else:
influx_client = None
agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
else: ### hdf5 data backend
agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("sleeping")
logging.info("sleeping for %.1f s" % options.dump_period)
time.sleep(options.dump_period)
dataspan = set()
if consumer is not None:
if consumer:
# this is not threadsafe!
logging.info("getting data from kafka")
timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes)
logging.info("retrieving data from kafka")
start = timeit.default_timer()
datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, max_records = 2 * len(jobs) * len(routes))
elapsed = timeit.default_timer() - start
logging.info("time to retrieve data: %.1f s" % elapsed)
else:
timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads)
logging.info("retrieving data from bottle routes")
datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads)
# store and reduce data for each job
start = timeit.default_timer()
for route in routes:
# First get the raw and reduced data for each job in parallel
if influx_client:
io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job')
else:
mapargs = [(options.base_dir, route, job, 'job', datatypes, timedata[route], datadata[route], prevdataspan) for job in jobs]
for ds in pool.map(io.hdf5.reduce_by_tag, mapargs):
dataspan.update(ds)
prevdataspan = dataspan.copy()
# Then reduce the data across jobs at each level
mapargs = []
for start, end in zip(*aggregator.job_expanse(dataspan)):
mapargs = []
for route in routes:
mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end))
pool.map(io.hdf5.reduce_across_tags, mapargs)
logging.info("processed reduced data in [%d %d) at %d for route %s" % (int(start), int(end), int(aggregator.now()), route))
logging.info("storing and reducing timeseries for measurement: %s" % route)
agg_sink.store_and_reduce(route, datadata[route], 'data', tags='job', aggregates=options.data_type)
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
#
# always end on an error so that condor won't think we're done and will
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment