Skip to content
Snippets Groups Projects
Commit 037c792b authored by Patrick Godwin's avatar Patrick Godwin
Browse files

remove gstlal_ll_inspiral_state from gstlal-ugly as...

remove gstlal_ll_inspiral_state from gstlal-ugly as gstlal_ll_inspiral_aggregator can now handle state aggregation as well
parent b781a226
No related branches found
No related tags found
No related merge requests found
......@@ -27,7 +27,6 @@ dist_bin_SCRIPTS = \
gstlal_cache_to_segments \
gstlal_ll_inspiral_aggregator \
gstlal_ll_dq \
gstlal_ll_inspiral_state \
gstlal_condor_top \
gstlal_injsplitter \
gstlal_reduce_dag \
......
#!/usr/bin/env python
#
# Copyright (C) 2016 Kipp Cannon, Chad Hanna
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import h5py
import numpy
import sys, os
import itertools
import argparse
import lal
from lal import LIGOTimeGPS
import time
from gi.repository import GLib
import logging
import subprocess
import urllib2
import shutil
import collections
from multiprocessing import Pool
import json
from datamon import aggregator
from datamon import io
#
# =============================================================================
#
# Command Line
#
# =============================================================================
#
# Read command line options
def parse_command_line():
parser = argparse.ArgumentParser(description="Online data aggregator")
# directory to put everything in
parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path")
parser.add_argument("--job-start", type=int, help="job id to start aggregating from")
parser.add_argument("--route", action="append", help="Specify routes to download. Can be given multiple times.")
parser.add_argument("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.")
parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.")
parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.")
args = parser.parse_args()
#FIXME do error checking
if args.data_type is None:
args.data_type = ["min", "max", "median"]
assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]'
return args
#
# =============================================================================
#
# Main
#
# =============================================================================
#
if __name__ == '__main__':
options = parse_command_line()
# FIXME don't hardcode some of these?
datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type]
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
routes = options.route
for job in jobs: aggregator.makedir("%s/by_job/%s" % (options.base_dir, job))
logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
pool = Pool(options.num_threads)
prevdataspan = set()
# We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs
if options.kafka_server:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
else:
consumer = None
# Instantiate influxDB connection if data backend is influx
if options.data_backend == 'influx':
influx_client = io.influx.create_client(options.influx_hostname, options.influx_port)
else:
influx_client = None
while True:
logging.info("sleeping")
time.sleep(options.dump_period)
dataspan = set()
if consumer is not None:
# this is not threadsafe!
logging.info("getting data from kafka")
timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes)
else:
timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads)
for route in routes:
if "L1" in route or "H1" in route:
# FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo
datadata[route] = {job: (data / 16) for job, data in datadata[route].items()}
# First get the raw and reduced data for each job in parallel
if influx_client:
io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job')
else:
mapargs = [(options.base_dir, route.replace("/","_"), job, 'job', datatypes, timedata[route], datadata[route], prevdataspan) for job in jobs]
for ds in pool.map(io.hdf5.reduce_by_tag, mapargs):
dataspan.update(ds)
prevdataspan = dataspan.copy()
# Then reduce the data across jobs at each level
mapargs = []
for start, end in zip(*aggregator.job_expanse(dataspan)):
mapargs = []
for route in routes:
mapargs.append((options.base_dir, route.replace("/","_"), jobs, 'job', datatypes, start, end))
pool.map(io.hdf5.reduce_across_tags, mapargs)
logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now())))
#
# always end on an error so that condor won't think we're done and will
# restart us
#
sys.exit(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment