Skip to content
Snippets Groups Projects
Commit 66054555 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal-ugly: port aggregation code to use datamon functions, tidy up

parent 1955e816
No related branches found
No related tags found
No related merge requests found
......@@ -6,7 +6,7 @@ from collections import deque
from scipy import signal
import sys
import StringIO
from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd, aggregator
from gstlal import pipeparts, datasource, simplehandler, pipeio, reference_psd
from optparse import OptionParser
import gi
gi.require_version('Gst', '1.0')
......@@ -16,6 +16,9 @@ Gst.init(None)
import h5py
import logging
from datamon import aggregator
from datamon import io
def parse_command_line():
parser = OptionParser(description = __doc__)
......@@ -41,11 +44,14 @@ class PSDHandler(simplehandler.Handler):
del kwargs["instrument"]
simplehandler.Handler.__init__(self, *args, **kwargs)
self.horizon_distance_func = reference_psd.HorizonDistance(20., 2048., 1./16., 1.4, 1.4)
self.range_history = deque(maxlen = 10000)
self.range_history_time = deque(maxlen = 10000)
self.noisedeq = deque(maxlen = 10000)
self.routes = ("noise", "range_history")
self.datatypes = (("min", min), ("median", aggregator.median), ("max", max))
self.timedeq = deque(maxlen = 10000)
self.datadeq = {route: deque(maxlen = 10000) for route in self.routes}
self.last_reduce_time = None
self.prevdataspan = set()
def do_on_message(self, bus, message):
if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum":
self.psd = pipeio.parse_spectrum_message(message)
......@@ -58,19 +64,16 @@ class PSDHandler(simplehandler.Handler):
if self.last_reduce_time is None:
self.last_reduce_time = int(round(buftime,-2))
(result, mapinfo) = buf.map(Gst.MapFlags.READ)
thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime)), self.instrument)
for typ in ("min", "median", "max"):
aggregator.makedir(os.path.join(thisdir, typ))
if mapinfo.data:
# First noise
s = StringIO.StringIO(mapinfo.data)
data = numpy.array([(float(x.split()[0]), abs(float(x.split()[1]))) for x in s.getvalue().split('\n') if x])
ix = numpy.argmax(data, axis=0)[1]
self.timedeq.append(buftime)
self.noisedeq.append(data[ix,1])
self.datadeq['noise'].append(data[ix,1])
# Then range
self.range_history.append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
self.datadeq['range_history'].append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
# The PSD
psd_freq = numpy.arange(self.psd.data.length / 4) * self.psd.deltaF * 4
......@@ -80,35 +83,19 @@ class PSDHandler(simplehandler.Handler):
del buf
return Gst.FlowReturn.OK
# Save a "latest"
self.to_hdf5(os.path.join(os.path.join(self.out_path, self.instrument), "psd.hdf5"), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
# write out all of the file types
for typ in ("min", "median", "max"):
self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "noise.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.noisedeq)})
self.to_hdf5(os.path.join("%s/%s" % (thisdir, typ), "range_history.hdf5"), {"time": numpy.array(self.timedeq), "data": numpy.array(self.range_history)})
#
# FIXME do data reduction by levels here.
#
# Only reduce every 100s
if (buftime - self.last_reduce_time) >= 100:
logging.info("reducing data and writing PSD snaphot for %d @ %d" % (buftime, int(aggregator.now())))
self.last_reduce_time = int(round(buftime,-2))
for typ, func in (("min", min), ("median", aggregator.median), ("max", max)):
for route in ("noise", "range_history"):
for level in range(0, aggregator.DIRS-1):
thisdir = os.path.join(os.path.join(self.out_path, os.path.join(aggregator.gps_to_leaf_directory(buftime, level = level)), self.instrument), typ)
nextdir = os.path.join(os.path.join(self.out_path, os.path.join(aggregator.gps_to_leaf_directory(buftime, level = level+1)), self.instrument), typ)
aggregator.makedir(nextdir)
this_fname, this_x, this_y = aggregator.get_dataset(thisdir, route)
next_fname, next_x, next_y = aggregator.get_dataset(nextdir, route)
xarr, yarr = aggregator.reduce_data(numpy.concatenate((this_x,next_x)), numpy.concatenate((this_y, next_y)), func, level = level + 1)
self.to_hdf5(next_fname, {"time": numpy.array(xarr), "data": numpy.array(yarr)})
# The PSD is special, we just record it. No min/median/max
thisdir = os.path.join(os.path.join(self.out_path, aggregator.gps_to_leaf_directory(buftime)), self.instrument)
psd_name = "%s-PSD-%d0-100.hdf5" % (self.instrument, int(round(buftime,-2)))
logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(aggregator.now())))
timedata = {(self.instrument, route): numpy.array(self.timedeq) for route in self.routes}
datadata = {(self.instrument, route): numpy.array(self.datadeq[route]) for route in self.routes}
self.prevdataspan = io.hdf5.reduce_by_tag((self.out_path, self.routes, self.instrument, 'instrument', self.datatypes, timedata, datadata, self.prevdataspan))
# Save a "latest" psd
# NOTE: The PSD is special, we just record it. No min/median/max
thisdir = os.path.join(self.out_path, io.hdf5.gps_to_leaf_directory(buftime), 'by_instrument', self.instrument)
psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
buf.unmap(mapinfo)
......@@ -122,7 +109,6 @@ class PSDHandler(simplehandler.Handler):
def to_hdf5(self, path, datadict):
tmppath = "/dev/shm/%s" % path.replace("/","_") + ".tmp"
print tmppath
f = h5py.File(tmppath, "w")
for k, v in datadict.items():
f[k] = v
......
......@@ -32,9 +32,11 @@ import urllib2
import shutil
import collections
from multiprocessing import Pool
from gstlal import aggregator
import json
from datamon import aggregator
from datamon import io
MIN_TIME_QUANTA = 10000
DIRS = 6
......@@ -82,10 +84,9 @@ def parse_command_line():
#
if __name__ == '__main__':
options = parse_command_line()
# FIXME don't hardcode some of these?
# FIXME don't hardcode some of these?
datatypes = [x for x in [("min", min), ("max", max), ("median", aggregator.median)] if x[0] in options.data_type]
jobs = ["%04d" % b for b in numpy.arange(options.job_start, options.job_start + options.num_jobs)]
routes = options.route
......@@ -94,12 +95,14 @@ if __name__ == '__main__':
pool = Pool(options.num_threads)
prevdataspan = set()
# We instantiate a single - NOT THREAD SAFE - consumer to subscribe to all of our topics, i.e., jobs
if options.kafka_server is not None:
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
else:
consumer = None
# start an infinite loop to keep updating and aggregating data
while True:
logging.info("sleeping")
......@@ -109,29 +112,23 @@ if __name__ == '__main__':
if consumer is not None:
# this is not threadsafe!
logging.info("getting data from kafka")
timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer)
timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes)
else:
timedata, datadata = None, None
timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_id, num_threads=options.num_threads)
# First get the raw and reduced data for each job in parallel
mapargs = [(job, options.job_tag, routes, datatypes, prevdataspan, options.base_dir, jobs, timedata, datadata) for job in jobs]
for ds in pool.map(aggregator.get_data_from_job_and_reduce, mapargs):
#for ds in map(aggregator.get_data_from_job_and_reduce, mapargs):
mapargs = [(options.base_dir, routes, job, 'job', datatypes, timedata, datadata, prevdataspan) for job in jobs]
for ds in pool.map(io.hdf5.reduce_by_tag, mapargs):
dataspan.update(ds)
prevdataspan = dataspan.copy()
# Then reduce the data across jobs at each level
mapargs = []
for start, end in zip(*aggregator.job_expanse(dataspan)):
# FIXME don't hardcode this range
for level in range(DIRS):
this_level_dir = "/".join([options.base_dir, aggregator.gps_to_leaf_directory(start, level = level)])
mapargs = []
for route in routes:
for (typ,func) in datatypes:
aggregator.setup_dir_across_job_by_level(start, typ, route, options.base_dir, verbose = True, level = level)
mapargs.append((jobs, this_level_dir, typ, route, func, level, start, end))
pool.map(aggregator.reduce_across_jobs, mapargs)
#map(aggregator.reduce_across_jobs, mapargs)
mapargs = []
for route in routes:
mapargs.append((options.base_dir, route, jobs, 'job', datatypes, start, end))
pool.map(io.hdf5.reduce_across_tags, mapargs)
logging.info("processed reduced data in [%d %d) at %d" % (int(start), int(end), int(aggregator.now())))
#
......
......@@ -32,9 +32,10 @@ import urllib2
import shutil
import collections
from multiprocessing import Pool
from gstlal import aggregator
import json
from datamon import aggregator
from datamon import io
#
# =============================================================================
......@@ -73,18 +74,6 @@ def parse_command_line():
# =============================================================================
#
def get_data_from_route((job, job_tag, routes, basedir)):
with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f:
url = f.readline().strip()
for route in routes:
logging.info("processing job %s for route %s : %s" % (job, route, url))
data = aggregator.get_url(url, route)
jobtime, jobdata = data[0], data[1]
path = "%s/by_job/%s" % (basedir, job)
tmpfname, fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = jobtime, data = jobdata, tmp = True)
shutil.move(tmpfname, fname)
if __name__ == '__main__':
options = parse_command_line()
......@@ -108,7 +97,6 @@ if __name__ == '__main__':
from kafka import KafkaConsumer
consumer = KafkaConsumer(*jobs, bootstrap_servers=[options.kafka_server], value_deserializer=lambda m: json.loads(m.decode('ascii')), auto_offset_reset='latest')
else:
pool = Pool(options.num_threads)
consumer = None
while True:
logging.info("sleeping")
......@@ -117,17 +105,16 @@ if __name__ == '__main__':
if consumer:
# this is not threadsafe!
logging.info("getting data from kafka")
timedata, datadata = aggregator.get_data_from_kafka(jobs, routes, consumer, req_all = True)
timedata, datadata = io.kafka.retrieve_timeseries(consumer, jobs, routes, req_all = True)
for (job,route) in timedata:
if "L1" in route or "H1" in route:
# FIXME hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo
datadata[(job,route)] /= 16
path = "%s/by_job/%s" % (options.base_dir, job)
tmpfname, fname = aggregator.create_new_dataset(path, route.replace("/","_"), timedata = timedata[(job,route)], data = datadata[(job,route)], tmp = True)
shutil.move(tmpfname, fname)
else:
mapargs = [(job, options.job_tag, routes, options.base_dir) for job in jobs]
pool.map(get_data_from_route, mapargs)
timedata, datadata = io.http.retrieve_timeseries(options.base_dir, jobs, routes, options.job_tag, num_threads = options.num_threads)
for (job, route) in timedata:
path = "%s/by_job/%s" % (options.base_dir, job)
io.hdf5.store_timeseries(path, route.replace("/","_"), timedata[(job, route)], datadata[(job, route)])
sys.exit(1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment