Skip to content
Snippets Groups Projects
Commit 68b0a00a authored by Patrick Godwin's avatar Patrick Godwin
Browse files

lloidhandler.py: change structure of data stored and pushed to kafka within...

lloidhandler.py: change structure of data stored and pushed to kafka within EyeCandy, change name of state routes, add logic to handle sampling rates of state info here rather than downstream
parent d9640f30
No related branches found
No related tags found
No related merge requests found
......@@ -46,7 +46,7 @@
#
from collections import deque
from collections import defaultdict, deque
try:
from fpconst import NaN
from fpconst import PosInf
......@@ -145,6 +145,7 @@ class EyeCandy(object):
self.far_history = deque(maxlen = 300)
self.ram_history = deque(maxlen = 2)
self.ifo_snr_history = dict((instrument, deque(maxlen = 300)) for instrument in instruments)
self.state_sample_rates = {"H1": 16, "L1": 16, "V1": 1}
self.dqvectors = {}
self.statevectors = {}
self.strain = {}
......@@ -183,7 +184,7 @@ class EyeCandy(object):
if self.kafka_server is not None:
from kafka import KafkaProducer
self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('ascii'))
self.producer = KafkaProducer(bootstrap_servers=[self.kafka_server], value_serializer=lambda m: json.dumps(m).encode('utf-8'))
else:
self.producer = None
......@@ -192,22 +193,8 @@ class EyeCandy(object):
# maintaining the bottle route methods, we should keep this a
# bit separate for now to not disrupt too much.
self.kafka_data = {}
self.kafka_data["latency_history"] = ""
self.kafka_data["likelihood_history"] = ""
self.kafka_data["snr_history"] = ""
self.kafka_data["far_history"] = ""
self.kafka_data["ram_history"] = ""
self.kafka_data["coinc"] = ""
for instrument in instruments:
self.kafka_data["%s_snr_history" % instrument] = ""
self.kafka_data["%s/dqvector_on" % instrument] = ""
self.kafka_data["%s/dqvector_off" % instrument] = ""
self.kafka_data["%s/dqvector_gap" % instrument] = ""
self.kafka_data["%s/statevector_on" % instrument] = ""
self.kafka_data["%s/statevector_off" % instrument] = ""
self.kafka_data["%s/statevector_gap" % instrument] = ""
self.kafka_data["%s/strain_dropped" % instrument] = ""
self.kafka_data = defaultdict(lambda: {'time': [], 'data': []})
self.kafka_data["coinc"] = []
def update(self, events, last_coincs):
self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB
......@@ -215,7 +202,8 @@ class EyeCandy(object):
max_snr_event = max(events, key = lambda event: event.snr)
self.ifo_snr_history[max_snr_event.ifo].append((float(max_snr_event.end), max_snr_event.snr))
if self.producer is not None:
self.kafka_data["%s_snr_history" % max_snr_event.ifo] += "%s\t%s\n" % (self.ifo_snr_history[max_snr_event.ifo][-1][0], self.ifo_snr_history[max_snr_event.ifo][-1][1])
for ii, column in enumerate(["time", "data"]):
self.kafka_data["%s_snr_history" % max_snr_event.ifo][column].append(float(self.ifo_snr_history[max_snr_event.ifo][-1][ii]))
if last_coincs:
coinc_inspiral_index = last_coincs.coinc_inspiral_index
coinc_event_index = last_coincs.coinc_event_index
......@@ -239,7 +227,7 @@ class EyeCandy(object):
coinc_dict["%s_%s" % (sngl_row.ifo, attr)] = float(getattr(sngl_row, attr))
coinc_dict["%s_end" % sngl_row.ifo] = float(sngl_row.end)
coinc_dict_list.append(coinc_dict)
self.kafka_data["coinc"] += json.dumps(coinc_dict_list)
self.kafka_data["coinc"].extend(coinc_dict_list)
for coinc_inspiral in coinc_inspiral_index.values():
# latency in .minimum_duration
# FIXME: update when a proper column is available
......@@ -257,38 +245,45 @@ class EyeCandy(object):
self.far_history.append((max_likelihood_t, max_likelihood_far))
if self.producer is not None:
self.kafka_data["latency_history"] += "%s\t%s\n" % (self.latency_history[-1][0], self.latency_history[-1][1])
self.kafka_data["snr_history"] += "%s\t%s\n" % (self.snr_history[-1][0], self.snr_history[-1][1])
self.kafka_data["likelihood_history"] += "%s\t%s\n" % (self.likelihood_history[-1][0], self.likelihood_history[-1][1])
self.kafka_data["far_history"] += "%s\t%s\n" % (self.far_history[-1][0], self.far_history[-1][1])
for ii, column in enumerate(["time", "data"]):
self.kafka_data["latency_history"][column].append(float(self.latency_history[-1][ii]))
self.kafka_data["snr_history"][column].append(float(self.snr_history[-1][ii]))
self.kafka_data["likelihood_history"][column].append(float(self.likelihood_history[-1][ii]))
self.kafka_data["far_history"][column].append(float(self.far_history[-1][ii]))
t = inspiral.now()
if self.time_since_last_state is None:
self.time_since_last_state = t
if self.producer is not None and (t - self.time_since_last_state) > 1:
self.time_since_last_state = t
self.kafka_data["ram_history"] += "%s\t%s" % (self.ram_history[-1][0], self.ram_history[-1][1])
for ii, column in enumerate(["time", "data"]):
self.kafka_data["ram_history"][column].append(float(self.ram_history[-1][ii]))
# send the state vector and dq vector information to kafka
# FIXME state sample rate hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo
for instrument, elem in self.dqvectors.items():
self.kafka_data["%s/dqvector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
self.kafka_data["%s/dqvector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
self.kafka_data["%s/dqvector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
for state in ["on", "off", "gap"]:
self.kafka_data["%s_dqvector_%s" % (instrument, state)]["time"].append(float(t))
self.kafka_data["%s_dqvector_%s" % (instrument, state)]["data"].append(elem.get_property("%s-samples" % state) / self.state_sample_rates[instrument])
for instrument, elem in self.statevectors.items():
self.kafka_data["%s/statevector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
self.kafka_data["%s/statevector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
self.kafka_data["%s/statevector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
for state in ["on", "off", "gap"]:
self.kafka_data["%s_statevector_%s" % (instrument, state)]["time"].append(float(t))
self.kafka_data["%s_statevector_%s" % (instrument, state)]["data"].append(elem.get_property("%s-samples" % state) / self.state_sample_rates[instrument])
for instrument, elem in self.strain.items():
# I know the name is strain_drop even though it
# comes from the "add" property. that is
# because audiorate has to "add" samples when
# data is dropped.
# FIXME don't hard code the rate
self.kafka_data["%s/strain_dropped" % instrument] += "%s\t%s" % (t, elem.get_property("add") / 16384.)
self.kafka_data["%s_strain_dropped" % instrument]["time"].append(float(t))
self.kafka_data["%s_strain_dropped" % instrument]["data"].append(elem.get_property("add") / 16384.)
# Send and flush all of the kafka messages and clear the data
self.producer.send(self.tag, self.kafka_data)
self.producer.flush()
for k in self.kafka_data: self.kafka_data[k] = ""
for route in self.kafka_data.keys():
self.kafka_data[route] = {'time': [], 'data': []}
self.kafka_data["coinc"] = []
def web_get_latency_histogram(self):
with self.lock:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment