Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
1 file
+ 51
92
Compare changes
  • Side-by-side
  • Inline
+ 51
92
@@ -31,16 +31,11 @@ import numpy
from scipy import signal
import yaml
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
from ligo.scald import utils
from ligo.scald.io import core, influx
from gstlal import pipeparts, datasource, simplehandler, pipeio, psd
from gstlal.stream import MessageType, Stream
#
# =============================================================================
@@ -67,16 +62,12 @@ def parse_command_line():
return options, filenames
class PSDHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
class NoiseTracker(object):
def __init__(self, out_path, instrument, agg_sink):
self.psd = None
self.out_path = kwargs["out_path"]
self.instrument = kwargs["instrument"]
self.agg_sink = kwargs["agg_sink"]
del kwargs["out_path"]
del kwargs["instrument"]
del kwargs["agg_sink"]
simplehandler.Handler.__init__(self, *args, **kwargs)
self.out_path = out_path
self.instrument = instrument
self.agg_sink = agg_sink
self.horizon_distance_func = psd.HorizonDistance(15., 900., 1./16., 1.4, 1.4)
self.routes = ("noise", "range_history")
@@ -85,41 +76,33 @@ class PSDHandler(simplehandler.Handler):
self.last_reduce_time = None
self.prevdataspan = set()
def do_on_message(self, bus, message):
if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum":
self.psd = pipeio.parse_spectrum_message(message)
return True
return False
def on_spectrum_message(self, message):
self.psd = pipeio.parse_spectrum_message(message)
return True
def bufhandler(self,elem):
buf = elem.emit("pull-sample").get_buffer()
buftime = int(buf.pts / 1e9)
def on_buffer(self, buf):
if self.last_reduce_time is None:
self.last_reduce_time = int(round(buftime,-2))
(result, mapinfo) = buf.map(Gst.MapFlags.READ)
if mapinfo.data:
# First noise
s = io.BytesIO(mapinfo.data)
data = numpy.array([(float(x.split()[0]), abs(float(x.split()[1]))) for x in s.getvalue().decode('utf-8').split('\n') if x])
ix = numpy.argmax(data, axis=0)[1]
self.timedeq.append(buftime)
self.datadeq['noise'].append(data[ix,1])
# Then range
self.datadeq['range_history'].append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
# The PSD
psd_freq = numpy.arange(self.psd.data.length / 4) * self.psd.deltaF * 4
psd_data = signal.decimate(self.psd.data.data[:], 4, ftype='fir')[:-1]**.5
else:
buf.unmap(mapinfo)
del buf
return Gst.FlowReturn.OK
self.last_reduce_time = int(round(buf.t0, -2))
if buf.data is None:
return
logging.debug(f"found buffer at t = {buf.t0}")
# First noise
ix = numpy.argmax(buf.data, axis=0)[1]
self.timedeq.append(buf.t0)
self.datadeq['noise'].append(buf.data[ix,1])
# Then range
self.datadeq['range_history'].append(self.horizon_distance_func(self.psd, 8)[0] / 2.25)
# The PSD
psd_freq = numpy.arange(self.psd.data.length / 4) * self.psd.deltaF * 4
psd_data = signal.decimate(self.psd.data.data[:], 4, ftype='fir', zero_phase=False)[:-1]**.5
# Only reduce every 100s
if (buftime - self.last_reduce_time) >= 100:
self.last_reduce_time = int(round(buftime,-2))
logging.debug("reducing data and writing PSD snapshot for %d @ %d" % (buftime, int(utils.gps_now())))
if (buf.t0 - self.last_reduce_time) >= 100:
self.last_reduce_time = int(round(buf.t0, -2))
logging.info("reducing data and writing PSD snapshot for %d @ %d" % (buf.t0, int(utils.gps_now())))
data = {route: {self.instrument: {'time': list(self.timedeq), 'fields': {'data': list(self.datadeq[route])}}} for route in self.routes}
@@ -134,19 +117,10 @@ class PSDHandler(simplehandler.Handler):
# Save a "latest" psd
# NOTE: The PSD is special, we just record it. No min/median/max
thisdir = os.path.join(self.out_path, core.gps_to_leaf_directory(buftime))
thisdir = os.path.join(self.out_path, core.gps_to_leaf_directory(buf.t0))
core.makedir(thisdir)
psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buftime,-2)))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buftime])})
buf.unmap(mapinfo)
del buf
return Gst.FlowReturn.OK
def prehandler(self,elem):
buf = elem.emit("pull-preroll")
del buf
return Gst.FlowReturn.OK
psd_name = "%s-PSD-%d-100.hdf5" % (self.instrument, int(round(buf.t0, -2)))
self.to_hdf5(os.path.join(thisdir, psd_name), {"freq": psd_freq, "asd": psd_data, "time": numpy.array([buf.t0])})
def to_hdf5(self, path, datadict):
tmppath = "/dev/shm/%s" % path.replace("/","_") + ".tmp"
@@ -169,7 +143,7 @@ if __name__ == '__main__':
options, filenames = parse_command_line()
log_level = logging.DEBUG if options.verbose else logging.INFO
logging.basicConfig(level = log_level, format = "%(asctime)s %(levelname)s:%(processName)s(%(process)d):%(funcName)s: %(message)s")
logging.basicConfig(level = log_level, format = "%(asctime)s | gstlal_ll_dq : %(levelname)s : %(message)s")
# set up aggregator sink
with open(options.scald_config, 'r') as f:
@@ -186,45 +160,30 @@ if __name__ == '__main__':
# only support one channel
instrument = list(gw_data_source_info.channel_dict.keys())[0]
# set up noise tracker
tracker = NoiseTracker(options.out_path, instrument, agg_sink)
#
# build pipeline
#
if options.verbose:
print("building pipeline ...", file=sys.stderr)
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(name="DQ")
handler = PSDHandler(mainloop, pipeline, out_path = options.out_path, instrument = instrument, agg_sink = agg_sink)
head, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = options.verbose)
head = pipeparts.mkresample(pipeline, head, quality = 9)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d" % options.sample_rate)
head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 8)
head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, fft_length = options.psd_fft_length, average_samples = 64, median_samples = 7, expand_gaps = True)
head = pipeparts.mkqueue(pipeline, head)
head = pipeparts.mkreblock(pipeline, head)
head = pipeparts.mkgeneric(pipeline, head, "lal_nxydump")
sink = pipeparts.mkappsink(pipeline, head, max_buffers = 1, sync = False)
sink.connect("new-sample", handler.bufhandler)
sink.connect("new-preroll", handler.prehandler)
stream = Stream.from_datasource(gw_data_source_info, instrument, verbose=options.verbose)
stream.add_callback(MessageType.ELEMENT, "spectrum", tracker.on_spectrum_message)
logging.info("building pipeline ...")
stream.resample(quality=9) \
.capsfilter("audio/x-raw, rate=%d" % options.sample_rate) \
.queue(max_size_buffers=8) \
.whiten(fft_length=options.psd_fft_length, expand_gaps=True) \
.queue() \
.reblock() \
.nxydump() \
.sink(tracker.on_buffer)
#
# process segment
#
if options.verbose:
print("putting pipeline into READY state ...", file=sys.stderr)
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if gw_data_source_info.data_source not in ("lvshm", "framexmit"):# FIXME what about nds online?
datasource.pipeline_seek_for_gps(pipeline, *gw_data_source_info.seg)
if options.verbose:
print("putting pipeline into PLAYING state ...", file=sys.stderr)
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
if options.verbose:
print("running pipeline ...", file=sys.stderr)
mainloop.run()
if options.verbose:
print("Shutting down", file=sys.stderr)
logging.info("running pipeline ...")
stream.start()
logging.info("shutting down...")
Loading