From 037c792bd4616cd4cb3e7d86371870bac030c9b1 Mon Sep 17 00:00:00 2001 From: Patrick Godwin <patrick.godwin@ligo.org> Date: Sun, 10 Feb 2019 16:07:21 -0800 Subject: [PATCH] remove gstlal_ll_inspiral_state from gstlal-ugly as gstlal_ll_inspiral_aggregator can now handle state aggregation as well --- gstlal-ugly/bin/Makefile.am | 1 - gstlal-ugly/bin/gstlal_ll_inspiral_state | 156 ----------------------- 2 files changed, 157 deletions(-) delete mode 100755 gstlal-ugly/bin/gstlal_ll_inspiral_state diff --git a/gstlal-ugly/bin/Makefile.am b/gstlal-ugly/bin/Makefile.am index 7297b54147..f702ef57f8 100644 --- a/gstlal-ugly/bin/Makefile.am +++ b/gstlal-ugly/bin/Makefile.am @@ -27,7 +27,6 @@ dist_bin_SCRIPTS = \ gstlal_cache_to_segments \ gstlal_ll_inspiral_aggregator \ gstlal_ll_dq \ - gstlal_ll_inspiral_state \ gstlal_condor_top \ gstlal_injsplitter \ gstlal_reduce_dag \ diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_state b/gstlal-ugly/bin/gstlal_ll_inspiral_state deleted file mode 100755 index 359ab8769f..0000000000 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_state +++ /dev/null @@ -1,156 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2016 Kipp Cannon, Chad Hanna -# -# This program is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by the -# Free Software Foundation; either version 2 of the License, or (at your -# option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -# Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - - -import h5py -import numpy -import sys, os -import itertools -import argparse -import lal -from lal import LIGOTimeGPS -import time -from gi.repository import GLib -import logging -import subprocess -import urllib2 -import shutil -import collections -from multiprocessing import Pool -import json - -from datamon import aggregator -from datamon import io - -# -# ============================================================================= -# -# Command Line -# -# ============================================================================= -# - - -# Read command line options -def parse_command_line(): - - parser = argparse.ArgumentParser(description="Online data aggregator") - - # directory to put everything in - parser.add_argument("--base-dir", action="store", default="aggregator", help="Specify output path") - parser.add_argument("--job-start", type=int, help="job id to start aggregating from") - parser.add_argument("--route", action="append", help="Specify routes to download. Can be given multiple times.") - parser.add_argument("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all") - parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)") - parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs") - parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") - parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently") - parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") - parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.") - parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") - parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") - parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") - - args = parser.parse_args() - - #FIXME do error checking - if args.data_type is None: - args.data_type = ["min", "max", "median"] - - assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]' - - return args - - -# -# ============================================================================= -# -# Main -# -# ============================================================================= -# - -if __name__ == '__main__': - - options = parse_command_line() - # FIXME don't hardcode some of these? - datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type] - jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)] - routes = options.route - - for job in jobs: aggregator.makedir("%s/by_job/%s" % (options.base_dir, job)) - - logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") - - pool = Pool(options.num_threads) - prevdataspan = set() - - # We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs - if options.kafka_server: - from kafka import KafkaConsumer - consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') - else: - consumer = None - - # Instantiate influxDB connection if data backend is influx - if options.data_backend == 'influx': - influx_client = io.influx.create_client(options.influx_hostname, options.influx_port) - else: - influx_client = None - - while True: - logging.info("sleeping") - time.sleep(options.dump_period) - dataspan = set() - - if consumer is not None: - # this is not threadsafe! - logging.info("getting data from kafka") - timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes) - else: - timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads) - - for route in routes: - if "L1" in route or "H1" in route: - # FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo - datadata[route] = {job: (data / 16) for job, data in datadata[route].items()} - - # First get the raw and reduced data for each job in parallel - if influx_client: - io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job') - else: - mapargs = [(options.base_dir, route.replace("/","_"), job, 'job', datatypes, timedata[route], datadata[route], prevdataspan) for job in jobs] - for ds in pool.map(io.hdf5.reduce_by_tag, mapargs): - dataspan.update(ds) - prevdataspan = dataspan.copy() - - # Then reduce the data across jobs at each level - mapargs = [] - for start, end in zip(*aggregator.job_expanse(dataspan)): - mapargs = [] - for route in routes: - mapargs.append((options.base_dir, route.replace("/","_"), jobs, 'job', datatypes, start, end)) - pool.map(io.hdf5.reduce_across_tags, mapargs) - logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now()))) - - # - # always end on an error so that condor won't think we're done and will - # restart us - # - - sys.exit(1) -- GitLab