Commit 71debb64 authored by Patrick Godwin's avatar Patrick Godwin Committed by Chad Hanna

update gstlal_ll_dq, gstlal_ll_inspiral_aggregator to work with API change in...

update gstlal_ll_dq, gstlal_ll_inspiral_aggregator to work with API change in ligo-scald introduced in 0.2
parent 352fa335
......@@ -131,7 +131,7 @@ class PSDHandler(simplehandler.Handler):
### store and reduce noise / range history
for route in self.routes:
agg_sink.store_columns(route, data[route], 'data', tags='ifo', aggregate="max")
agg_sink.store_columns(route, data[route], aggregate="max")
### flush buffers
self.timedeq.clear()
......@@ -140,7 +140,7 @@ class PSDHandler(simplehandler.Handler):
# Save a "latest" psd
# NOTE: The PSD is special, we just record it. No min/median/max
thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime))
thisdir = os.path.join(self.out_path, io.common.gps_to_leaf_directory(buftime))
aggregator.makedir(thisdir)
psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
......@@ -178,9 +178,13 @@ if __name__ == '__main__':
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
else: ### hdf5 data backend
agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.out_path, num_processes=options.num_threads)
agg_sink = io.hdf5.Aggregator(rootdir=options.out_path, num_processes=options.num_threads)
# register measurement schemas for aggregators
for route in ('noise', 'range_history'):
agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job')
# parse the generic "source" options, check for inconsistencies is done inside
# the class init method
......
......@@ -98,11 +98,15 @@ if __name__ == '__main__':
else:
consumer = None
# set up aggregator sink
# set up aggregator sink
if options.data_backend == 'influx':
agg_sink = io.influx.InfluxDBAggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
agg_sink = io.influx.Aggregator(hostname=options.influx_hostname, port=options.influx_port, db=options.influx_database_name)
else: ### hdf5 data backend
agg_sink = io.hdf5.HDF5Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
agg_sink = io.hdf5.Aggregator(rootdir=options.base_dir, num_processes=options.num_threads)
# register measurement schemas for aggregators
for route in routes:
agg_sink.register_schema(route, columns='data', column_key='data', tags='job', tag_key='job')
# start an infinite loop to keep updating and aggregating data
while True:
......@@ -125,7 +129,7 @@ if __name__ == '__main__':
for route in routes:
logging.info("storing and reducing timeseries for measurement: %s" % route)
for aggregate in options.data_type:
agg_sink.store_columns(route, datadata[route], 'data', tags='job', aggregate=aggregate)
agg_sink.store_columns(route, datadata[route], aggregate=aggregate)
elapsed = timeit.default_timer() - start
logging.info("time to store/reduce timeseries: %.1f s" % elapsed)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment