Skip to content
Snippets Groups Projects
Commit 4bf95a83 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

inspiral aggregators: fix issues with hdf5 aggregation, add aggregation to gstlal_ll_inspiral_state

parent 3401283d
No related branches found
No related tags found
No related merge requests found
......@@ -101,13 +101,13 @@ class PSDHandler(simplehandler.Handler):
for route in self.routes:
if self.influx_client:
io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'job')
io.influx.store_and_reduce_timeseries(influx_client, options.influx_database_name, route, timedata[route], datadata[route], 'data', 'ifo')
else:
self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, route, self.instrument, 'instrument', self.datatypes, timedata[route], datadata[route], self.prevdataspan))
self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, route, self.instrument, 'ifo', self.datatypes, timedata[route], datadata[route], self.prevdataspan))
# Save a "latest" psd
# NOTE: The PSD is special, we just record it. No min/median/max
thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime), 'by_instrument', self.instrument)
thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime))
psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
......@@ -141,7 +141,7 @@ logging.basicConfig(level = logging.INFO, format = "%(asctime)s %(levelname)s:%(
# Instantiate influxDB connection if data backend is influx
if options.data_backend == 'influx':
influx_client = io.influx.create_client(options.influx_hostname, options.influx_port)
else
else:
influx_client = None
# parse the generic "source" options, check for inconsistencies is done inside
......@@ -151,8 +151,6 @@ gw_data_source_info = datasource.GWDataSourceInfo(options)
# only support one channel
instrument = gw_data_source_info.channel_dict.keys()[0]
aggregator.makedir(os.path.join(options.out_path, instrument))
#
# build pipeline
#
......
......@@ -145,7 +145,7 @@ if __name__ == '__main__':
for route in routes:
mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end))
pool.map(io.hdf5.reduce_across_tags, mapargs)
logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now())))
logging.info("processed reduced data in [%d %d) at %d for route %s" % (int(start), int(end), int(aggregator.now()), route))
#
# always end on an error so that condor won't think we're done and will
......
......@@ -56,6 +56,7 @@ def parse_command_line():
parser.add_argument("--dump-period", type = float, default = 180., help = "Wait this many seconds between dumps of the URLs (default = 180., set to 0 to disable)")
parser.add_argument("--num-jobs", action="store", type=int, default=10, help="number of running jobs")
parser.add_argument("--job-tag", help = "Collect URLs for jobs reporting this job tag (default = collect all gstlal_inspiral URLs).")
parser.add_argument("--data-type", action="append", help="Specify datatypes to aggregate from 'min', 'max', 'median'. Can be given multiple times. Default all")
parser.add_argument("--num-threads", type = int, default = 16, help = "Number of threads to use concurrently")
parser.add_argument("--instrument", action = "append", help = "Number of threads to use concurrently")
parser.add_argument("--kafka-server", action="store", help="Specify kakfa server to read data from, example: 10.14.0.112:9092")
......@@ -66,6 +67,10 @@ def parse_command_line():
args = parser.parse_args()
#FIXME do error checking
if args.data_type is None:
args.data_type = ["min", "max", "median"]
assert args.data_backend in ('hdf5', 'influx'), '--data-backend must be one of [hdf5|influx]'
return args
......@@ -82,6 +87,8 @@ def parse_command_line():
if __name__ == '__main__':
options = parse_command_line()
# FIXME don't hardcode some of these?
datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type]
jobs = ["%04d" % b for b in numpy.arange(0, options.num_jobs)]
routes = ["ram_history"]
for ifo in options.instrument:
......@@ -144,7 +151,7 @@ if __name__ == '__main__':
for start, end in zip(*aggregator.job_expanse(dataspan)):
mapargs = []
for route in routes:
mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end))
mapargs.append((options.base_dir, route.replace("/","_"), jobs, 'job', datatypes, start, end))
pool.map(io.hdf5.reduce_across_tags, mapargs)
logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now())))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment