diff --git a/gstlal-ugly/bin/gstlal_ll_dq b/gstlal-ugly/bin/gstlal_ll_dq index f2098c8a04a8d22f0bc5e14e62501f4114ef6d89..ae2811c999262794f942df5c3c8887312b501060 100755 --- a/gstlal-ugly/bin/gstlal_ll_dq +++ b/gstlal-ugly/bin/gstlal_ll_dq @@ -30,9 +30,15 @@ def parse_command_line(): parser.add_option("--sample-rate", metavar = "Hz", default = 4096, type = "int", help = "Sample rate at which to generate the PSD, default 16384 Hz") parser.add_option("--psd-fft-length", metavar = "s", default = 16, type = "int", help = "FFT length, default 8s") parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose (optional).") + parser.add_option("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.") + parser.add_option("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") + parser.add_option("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") + parser.add_option("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") options, filenames = parser.parse_args() + assert options.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]' + return options, filenames class PSDHandler(simplehandler.Handler): @@ -40,8 +46,10 @@ class PSDHandler(simplehandler.Handler): self.psd = None self.out_path = kwargs["out_path"] self.instrument = kwargs["instrument"] + self.influx_client = kwargs["influx_client"] del kwargs["out_path"] del kwargs["instrument"] + del kwargs["influx_client"] simplehandler.Handler.__init__(self, *args, **kwargs) self.horizon_distance_func = reference_psd.HorizonDistance(20., 2048., 1./16., 1.4, 1.4) @@ -88,9 +96,14 @@ class PSDHandler(simplehandler.Handler): self.last_reduce_time = int(round(buftime,-2)) logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(aggregator.now()))) - timedata = {(self.instrument, route): numpy.array(self.timedeq) for route in self.routes} - datadata = {(self.instrument, route): numpy.array(self.datadeq[route]) for route in self.routes} - self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, self.routes, self.instrument, 'instrument', self.datatypes, timedata, datadata, self.prevdataspan)) + timedata = {route: {self.instrument: numpy.array(self.timedeq)} for route in self.routes} + datadata = {route: {self.instrument: numpy.array(self.datadeq[route])} for route in self.routes} + + for route in self.routes: + if self.influx_client: + io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job') + else: + self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, route, self.instrument, 'instrument', self.datatypes, timedata[route], datadata[route], self.prevdataspan)) # Save a "latest" psd # NOTE: The PSD is special, we just record it. No min/median/max @@ -125,6 +138,12 @@ options, filenames = parse_command_line() logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") +# Instantiate influxDB connection if data backend is influx +if options.data_backend == 'influx': + influx_client = io.influx.create_client(options.influx_hostname, options.influx_port) +else + influx_client = None + # parse the generic "source" options, check for inconsistencies is done inside # the class init method gw_data_source_info = datasource.GWDataSourceInfo(options) @@ -142,7 +161,7 @@ if options.verbose: print >>sys.stderr, "building pipeline ..." mainloop = GObject.MainLoop() pipeline = Gst.Pipeline(name="DQ") -handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument) +handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument, influx_client = influx_client) head, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = options.verbose) head = pipeparts.mkresample(pipeline, head, quality = 9) diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator index 038f9ae55ed55f6a786f4014a4696c50bfc2afb5..992a207ce65f88bcfc38405e87a9366e75453d8b 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_aggregator @@ -65,14 +65,20 @@ def parse_command_line(): parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).") parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently, default 16.") parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") + parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.") + parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") + parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") + parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") args = parser.parse_args() #FIXME do error checking if args.data_type is None: args.data_type = ["min", "max", "median"] - return args + assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]' + + return args # @@ -103,6 +109,12 @@ if __name__ == '__main__': else: consumer = None + # Instantiate influxDB connection if data backend is influx + if options.data_backend == 'influx': + influx_client = io.influx.create_client(options.influx_hostname, options.influx_port) + else: + influx_client = None + # start an infinite loop to keep updating and aggregating data while True: logging.info("sleeping") @@ -116,20 +128,24 @@ if __name__ == '__main__': else: timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_id, num_threads=options.num_threads) - # First get the raw and reduced data for each job in parallel - mapargs = [(options.base_dir, routes, job, 'job', datatypes, timedata, datadata, prevdataspan) for job in jobs] - for ds in pool.map(io.hdf5.reduce_by_tag, mapargs): - dataspan.update(ds) - prevdataspan = dataspan.copy() - - # Then reduce the data across jobs at each level - mapargs = [] - for start, end in zip(*aggregator.job_expanse(dataspan)): - mapargs = [] - for route in routes: - mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end)) - pool.map(io.hdf5.reduce_across_tags, mapargs) - logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now()))) + for route in routes: + # First get the raw and reduced data for each job in parallel + if influx_client: + io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job') + else: + mapargs = [(options.base_dir, route, job, 'job', datatypes, timedata[route], datadata[route], prevdataspan) for job in jobs] + for ds in pool.map(io.hdf5.reduce_by_tag, mapargs): + dataspan.update(ds) + prevdataspan = dataspan.copy() + + # Then reduce the data across jobs at each level + mapargs = [] + for start, end in zip(*aggregator.job_expanse(dataspan)): + mapargs = [] + for route in routes: + mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end)) + pool.map(io.hdf5.reduce_across_tags, mapargs) + logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now()))) # # always end on an error so that condor won't think we're done and will diff --git a/gstlal-ugly/bin/gstlal_ll_inspiral_state b/gstlal-ugly/bin/gstlal_ll_inspiral_state index d7b10d47ff6662f14309dffff27c0080aa834cee..6c04dacb496f60546ab5d20460160fed60293803 100755 --- a/gstlal-ugly/bin/gstlal_ll_inspiral_state +++ b/gstlal-ugly/bin/gstlal_ll_inspiral_state @@ -59,11 +59,16 @@ def parse_command_line(): parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently") parser.add_argument("--instrument", action = "append", help = "Number of threads to use concurrently") parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092") + parser.add_argument("--data-backend", default="hdf5", help = "Choose the backend for data to be stored into, options: [hdf5|influx]. default = hdf5.") + parser.add_argument("--influx-hostname", help = "Specify the hostname for the influxDB database. Required if --data-backend = influx.") + parser.add_argument("--influx-port", help = "Specify the port for the influxDB database. Required if --data-backend = influx.") + parser.add_argument("--influx-database-name", help = "Specify the database name for the influxDB database. Required if --data-backend = influx.") args = parser.parse_args() - return args + assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]' + return args # @@ -92,29 +97,60 @@ if __name__ == '__main__': logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s") + pool = Pool(options.num_threads) prevdataspan = set() + + # We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs if options.kafka_server: from kafka import KafkaConsumer consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest') else: consumer = None + + # Instantiate influxDB connection if data backend is influx + if options.data_backend == 'influx': + influx_client = io.influx.create_client(options.influx_hostname, options.influx_port) + else: + influx_client = None + while True: logging.info("sleeping") time.sleep(options.dump_period) + dataspan = set() - if consumer: + if consumer is not None: # this is not threadsafe! logging.info("getting data from kafka") - timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, req_all = True) - for (job,route) in timedata: - if "L1" in route or "H1" in route: - # FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo - datadata[(job,route)] /= 16 + timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes) else: - timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads = options.num_threads) - - for (job, route) in timedata: - path = "%s/by_job/%s" % (options.base_dir, job) - io.hdf5.store_timeseries(path, route.replace("/","_"), timedata[(job, route)], datadata[(job, route)]) + timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads=options.num_threads) + + for route in routes: + if "L1" in route or "H1" in route: + # FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo + datadata[route] = {job: (data / 16) for job, data in datadata[route].items()} + + # First get the raw and reduced data for each job in parallel + if influx_client: + io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job') + else: + mapargs = [(options.base_dir, route.replace("/","_"), job, 'job', datatypes, timedata[route], datadata[route], prevdataspan) for job in jobs] + for ds in pool.map(io.hdf5.reduce_by_tag, mapargs): + dataspan.update(ds) + prevdataspan = dataspan.copy() + + # Then reduce the data across jobs at each level + mapargs = [] + for start, end in zip(*aggregator.job_expanse(dataspan)): + mapargs = [] + for route in routes: + mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end)) + pool.map(io.hdf5.reduce_across_tags, mapargs) + logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now()))) + + # + # always end on an error so that condor won't think we're done and will + # restart us + # sys.exit(1)